summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py2281
1 files changed, 1482 insertions, 799 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 58df14954bb..9501aeae6b5 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -1,50 +1,88 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
import sys
import time
import stat
-import random
-import signal
-import json
import logging
-import socket
+import fcntl
import string
import errno
-from shutil import copyfileobj
-from errno import ENOENT, ENODATA, EPIPE, EEXIST
-from threading import currentThread, Condition, Lock
+import tarfile
+from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE, EINTR
+from threading import Condition, Lock
from datetime import datetime
-from gconf import gconf
-from tempfile import mkdtemp, NamedTemporaryFile
-from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \
- unescape, select, gauxpfx, md5hex, selfkill, entry2pb
+import gsyncdconfig as gconf
+import libgfchangelog
+from rconf import rconf
+from syncdutils import (Thread, GsyncdError, escape_space_newline,
+ unescape_space_newline, gauxpfx, escape,
+ lstat, errno_wrap, FreeObject, lf, matching_disk_gfid,
+ NoStimeAvailable, PartialHistoryAvailable,
+ host_brick_split)
URXTIME = (-1, 0)
+# Default rollover time set in changelog translator
+# changelog rollover time is hardcoded here to avoid the
+# xsync usage when crawling switch happens from history
+# to changelog. If rollover time increased in translator
+# then geo-rep can enter into xsync crawl after history
+# crawl before starting live changelog crawl.
+CHANGELOG_ROLLOVER_TIME = 15
+
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
+
def _xtime_now():
t = time.time()
sec = int(t)
nsec = int((t - sec) * 1000000)
return (sec, nsec)
+
def _volinfo_hook_relax_foreign(self):
volinfo_sys = self.get_sys_volinfo()
fgn_vi = volinfo_sys[self.KFGN]
if fgn_vi:
expiry = fgn_vi['timeout'] - int(time.time()) + 1
- logging.info('foreign volume info found, waiting %d sec for expiry' % \
- expiry)
+ logging.info(lf('foreign volume info found, waiting for expiry',
+ expiry=expiry))
time.sleep(expiry)
volinfo_sys = self.get_sys_volinfo()
- self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
- volinfo_sys)
- if self.inter_master:
- raise GsyncdError("cannot be intermediate master in special mode")
- return (volinfo_sys, state_change)
+ return volinfo_sys
+
+
+def edct(op, **ed):
+ dct = {}
+ dct['op'] = op
+ # This is used in automatic gfid conflict resolution.
+ # When marked True, it's skipped during re-processing.
+ dct['skip_entry'] = False
+ for k in ed:
+ if k == 'stat':
+ st = ed[k]
+ dst = dct['stat'] = {}
+ if st:
+ dst['uid'] = st.st_uid
+ dst['gid'] = st.st_gid
+ dst['mode'] = st.st_mode
+ dst['atime'] = st.st_atime
+ dst['mtime'] = st.st_mtime
+ else:
+ dct[k] = ed[k]
+ return dct
# The API!
@@ -53,17 +91,41 @@ def gmaster_builder(excrawl=None):
"""produce the GMaster class variant corresponding
to sync mode"""
this = sys.modules[__name__]
- modemixin = gconf.special_sync_mode
+ modemixin = gconf.get("special-sync-mode")
if not modemixin:
modemixin = 'normal'
- changemixin = isinstance(excrawl, str) and excrawl or gconf.change_detector
- logging.info('setting up %s change detection mode' % changemixin)
+
+ if gconf.get("change-detector") == 'xsync':
+ changemixin = 'xsync'
+ elif excrawl:
+ changemixin = excrawl
+ else:
+ changemixin = gconf.get("change-detector")
+
+ logging.debug(lf('setting up change detection mode',
+ mode=changemixin))
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
- sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
- purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
- class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin):
+
+ if gconf.get("use-rsync-xattrs"):
+ sendmarkmixin = SendmarkRsyncMixin
+ else:
+ sendmarkmixin = SendmarkNormalMixin
+
+ if gconf.get("ignore-deletes"):
+ purgemixin = PurgeNoopMixin
+ else:
+ purgemixin = PurgeNormalMixin
+
+ if gconf.get("sync-method") == "tarssh":
+ syncengine = TarSSHEngine
+ else:
+ syncengine = RsyncEngine
+
+ class _GMaster(crawlmixin, modemixin, sendmarkmixin,
+ purgemixin, syncengine):
pass
+
return _GMaster
@@ -72,6 +134,7 @@ def gmaster_builder(excrawl=None):
# sync modes
class NormalMixin(object):
+
"""normal geo-rep behavior"""
minus_infinity = URXTIME
@@ -99,21 +162,30 @@ class NormalMixin(object):
return xt0 >= xt1
def make_xtime_opts(self, is_master, opts):
- if not 'create' in opts:
+ if 'create' not in opts:
opts['create'] = is_master
- if not 'default_xtime' in opts:
+ if 'default_xtime' not in opts:
opts['default_xtime'] = URXTIME
- def xtime_low(self, server, path, **opts):
- xt = server.xtime(path, self.uuid)
+ def xtime_low(self, rsc, path, **opts):
+ if rsc == self.master:
+ xt = rsc.server.xtime(path, self.uuid)
+ else:
+ xt = rsc.server.stime(path, self.uuid)
+ if isinstance(xt, int) and xt == ENODATA:
+ xt = rsc.server.xtime(path, self.uuid)
+ if not isinstance(xt, int):
+ self.slave.server.set_stime(path, self.uuid, xt)
if isinstance(xt, int) and xt != ENODATA:
return xt
if xt == ENODATA or xt < self.volmark:
if opts['create']:
xt = _xtime_now()
- server.aggregated.set_xtime(path, self.uuid, xt)
+ rsc.server.aggregated.set_xtime(path, self.uuid, xt)
else:
- xt = opts['default_xtime']
+ zero_zero = (0, 0)
+ if xt != zero_zero:
+ xt = opts['default_xtime']
return xt
def keepalive_payload_hook(self, timo, gap):
@@ -125,7 +197,7 @@ class NormalMixin(object):
vi = vi.copy()
vi['timeout'] = int(time.time()) + timo
else:
- # send keep-alives more frequently to
+ # send keep-alive more frequently to
# avoid a delay in announcing our volume info
# to slave if it becomes established in the
# meantime
@@ -133,10 +205,7 @@ class NormalMixin(object):
return (vi, gap)
def volinfo_hook(self):
- volinfo_sys = self.get_sys_volinfo()
- self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
- volinfo_sys)
- return (volinfo_sys, state_change)
+ return self.get_sys_volinfo()
def xtime_reversion_hook(self, path, xtl, xtr):
if xtr > xtl:
@@ -146,186 +215,137 @@ class NormalMixin(object):
return xte > xtrd
def set_slave_xtime(self, path, mark):
- self.slave.server.set_xtime(path, self.uuid, mark)
+ self.slave.server.set_stime(path, self.uuid, mark)
+ # self.slave.server.set_xtime_remote(path, self.uuid, mark)
+
class PartialMixin(NormalMixin):
+
"""a variant tuned towards operation with a master
that has partial info of the slave (brick typically)"""
def xtime_reversion_hook(self, path, xtl, xtr):
pass
-class WrapupMixin(NormalMixin):
+
+class RecoverMixin(NormalMixin):
+
"""a variant that differs from normal in terms
of ignoring non-indexed files"""
@staticmethod
def make_xtime_opts(is_master, opts):
- if not 'create' in opts:
+ if 'create' not in opts:
opts['create'] = False
- if not 'default_xtime' in opts:
+ if 'default_xtime' not in opts:
opts['default_xtime'] = URXTIME
- @staticmethod
def keepalive_payload_hook(self, timo, gap):
return (None, gap)
def volinfo_hook(self):
return _volinfo_hook_relax_foreign(self)
-class BlindMixin(object):
- """Geo-rep flavor using vectored xtime.
+# Further mixins for certain tunable behaviors
- Coordinates are the master, slave uuid pair;
- in master coordinate behavior is normal,
- in slave coordinate we force synchronization
- on any value difference (these are in disjunctive
- relation, ie. if either orders the entry to be
- synced, it shall be synced.
- """
- minus_infinity = (URXTIME, None)
+class SendmarkNormalMixin(object):
- @staticmethod
- def serialize_xtime(xt):
- a = []
- for x in xt:
- if not x:
- x = ('None', '')
- a.extend(x)
- return '.'.join(str(n) for n in a)
+ def sendmark_regular(self, *a, **kw):
+ return self.sendmark(*a, **kw)
- @staticmethod
- def deserialize_xtime(xt):
- a = xt.split(".")
- a = (tuple(a[0:2]), tuple(a[3:4]))
- b = []
- for p in a:
- if p[0] == 'None':
- p = None
- else:
- p = tuple(int(x) for x in p)
- b.append(p)
- return tuple(b)
- @staticmethod
- def native_xtime(xt):
- return xt[0]
+class SendmarkRsyncMixin(object):
- @staticmethod
- def xtime_geq(xt0, xt1):
- return (not xt1[0] or xt0[0] >= xt1[0]) and \
- (not xt1[1] or xt0[1] >= xt1[1])
+ def sendmark_regular(self, *a, **kw):
+ pass
- @property
- def ruuid(self):
- if self.volinfo_r:
- return self.volinfo_r['uuid']
- @staticmethod
- def make_xtime_opts(is_master, opts):
- if not 'create' in opts:
- opts['create'] = is_master
- if not 'default_xtime' in opts:
- opts['default_xtime'] = URXTIME
+class PurgeNormalMixin(object):
- def xtime_low(self, server, path, **opts):
- xtd = server.xtime_vec(path, self.uuid, self.ruuid)
- if isinstance(xtd, int):
- return xtd
- xt = (xtd[self.uuid], xtd[self.ruuid])
- if not xt[1] and (not xt[0] or xt[0] < self.volmark):
- if opts['create']:
- # not expected, but can happen if file originates
- # from interrupted gsyncd transfer
- logging.warn('have to fix up missing xtime on ' + path)
- xt0 = _xtime_now()
- server.aggregated.set_xtime(path, self.uuid, xt0)
- else:
- xt0 = opts['default_xtime']
- xt = (xt0, xt[1])
- return xt
+ def purge_missing(self, path, names):
+ self.slave.server.purge(path, names)
- @staticmethod
- def keepalive_payload_hook(self, timo, gap):
- return (None, gap)
- def volinfo_hook(self):
- res = _volinfo_hook_relax_foreign(self)
- volinfo_r_new = self.slave.server.aggregated.native_volume_info()
- if volinfo_r_new['retval']:
- raise GsyncdError("slave is corrupt")
- if getattr(self, 'volinfo_r', None):
- if self.volinfo_r['uuid'] != volinfo_r_new['uuid']:
- raise GsyncdError("uuid mismatch on slave")
- self.volinfo_r = volinfo_r_new
- return res
+class PurgeNoopMixin(object):
- def xtime_reversion_hook(self, path, xtl, xtr):
- if not isinstance(xtr[0], int) and \
- (isinstance(xtl[0], int) or xtr[0] > xtl[0]):
- raise GsyncdError("timestamp corruption for " + path)
+ def purge_missing(self, path, names):
+ pass
- def need_sync(self, e, xte, xtrd):
- if xte[0]:
- if not xtrd[0] or xte[0] > xtrd[0]:
- # there is outstanding diff at 0th pos,
- # we can short-cut to true
- return True
- # we arrived to this point by either of these
- # two possiblilites:
- # - no outstanding difference at 0th pos,
- # wanna see 1st pos if he raises veto
- # against "no need to sync" proposal
- # - no data at 0th pos, 1st pos will have
- # to decide (due to xtime assignment,
- # in this case 1st pos does carry data
- # -- iow, if 1st pos did not have data,
- # and 0th neither, 0th would have been
- # force-feeded)
- if not xte[1]:
- # no data, no veto
- return False
- # the hard work: for 1st pos,
- # the conduct is fetch corresponding
- # slave data and do a "blind" comparison
- # (ie. do not care who is newer, we trigger
- # sync on non-identical xitmes)
- xtr = self.xtime(e, self.slave)
- return isinstance(xtr, int) or xte[1] != xtr[1]
- def set_slave_xtime(self, path, mark):
- xtd = {}
- for (u, t) in zip((self.uuid, self.ruuid), mark):
- if t:
- xtd[u] = t
- self.slave.server.set_xtime_vec(path, xtd)
+class TarSSHEngine(object):
+ """Sync engine that uses tar(1) piped over ssh(1)
+ for data transfers. Good for lots of small files.
+ """
-# Further mixins for certain tunable behaviors
+ def a_syncdata(self, files):
+ logging.debug(lf("Files", files=files))
-class SendmarkNormalMixin(object):
+ for f in files:
+ pb = self.syncer.add(f)
- def sendmark_regular(self, *a, **kw):
- return self.sendmark(*a, **kw)
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug(lf('synced', file=se))
+ return True
+ else:
+ # stat check for file presence
+ st = lstat(se)
+ if isinstance(st, int):
+ # file got unlinked in the interim
+ self.unlinked_gfids.add(se)
+ return True
-class SendmarkRsyncMixin(object):
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
- def sendmark_regular(self, *a, **kw):
- pass
+ def syncdata_wait(self):
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ return True
+ def syncdata(self, files):
+ self.a_syncdata(files)
+ self.syncdata_wait()
-class PurgeNormalMixin(object):
- def purge_missing(self, path, names):
- self.slave.server.purge(path, names)
+class RsyncEngine(object):
-class PurgeNoopMixin(object):
+ """Sync engine that uses rsync(1) for data transfers"""
+
+ def a_syncdata(self, files):
+ logging.debug(lf("files", files=files))
+
+ for f in files:
+ logging.debug(lf('candidate for syncing', file=f))
+ pb = self.syncer.add(f)
+
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug(lf('synced', file=se))
+ return True
+ else:
+ # stat to check if the file exist
+ st = lstat(se)
+ if isinstance(st, int):
+ # file got unlinked in the interim
+ self.unlinked_gfids.add(se)
+ return True
+
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
+
+ def syncdata_wait(self):
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ return True
+
+ def syncdata(self, files):
+ self.a_syncdata(files)
+ self.syncdata_wait()
- def purge_missing(self, path, names):
- pass
class GMasterCommon(object):
+
"""abstract class impementling master role"""
KFGN = 0
@@ -336,8 +356,9 @@ class GMasterCommon(object):
err out on multiple foreign masters
"""
- fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \
- self.master.server.aggregated.native_volume_info()
+ fgn_vis, nat_vi = (
+ self.master.server.aggregated.foreign_volume_infos(),
+ self.master.server.aggregated.native_volume_info())
fgn_vi = None
if fgn_vis:
if len(fgn_vis) > 1:
@@ -355,12 +376,17 @@ class GMasterCommon(object):
if self.volinfo:
return self.volinfo['volume_mark']
- @property
- def inter_master(self):
- """decide if we are an intermediate master
- in a cascading setup
- """
- return self.volinfo_state[self.KFGN] and True or False
+ def get_entry_stime(self):
+ data = self.slave.server.entry_stime(".", self.uuid)
+ if isinstance(data, int):
+ data = None
+ return data
+
+ def get_data_stime(self):
+ data = self.slave.server.stime(".", self.uuid)
+ if isinstance(data, int):
+ data = None
+ return data
def xtime(self, path, *a, **opts):
"""get amended xtime
@@ -376,40 +402,17 @@ class GMasterCommon(object):
else:
rsc = self.master
self.make_xtime_opts(rsc == self.master, opts)
- return self.xtime_low(rsc.server, path, **opts)
-
- def get_initial_crawl_data(self):
- default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0}
- if getattr(gconf, 'state_detail_file', None):
- try:
- return json.load(open(gconf.state_detail_file))
- except (IOError, OSError):
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- # Create file with initial data
- with open(gconf.state_detail_file, 'wb') as f:
- json.dump(default_data, f)
- return default_data
- else:
- raise
-
- return default_data
-
- def update_crawl_data(self):
- if getattr(gconf, 'state_detail_file', None):
- try:
- same_dir = os.path.dirname(gconf.state_detail_file)
- with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:
- json.dump(self.total_crawl_stats, tmp)
- os.rename(tmp.name, gconf.state_detail_file)
- except (IOError, OSError):
- raise
+ return self.xtime_low(rsc, path, **opts)
def __init__(self, master, slave):
self.master = master
self.slave = slave
self.jobtab = {}
- self.syncer = Syncer(slave)
+ if gconf.get("sync-method") == "tarssh":
+ self.syncer = Syncer(slave, self.slave.tarssh, [2])
+ else:
+ # partial transfer (cf. rsync(1)), that's normal
+ self.syncer = Syncer(slave, self.slave.rsync, [23, 24])
# crawls vs. turns:
# - self.crawls is simply the number of crawl() invocations on root
# - one turn is a maximal consecutive sequence of crawls so that each
@@ -417,30 +420,24 @@ class GMasterCommon(object):
# - self.turns is the number of turns since start
# - self.total_turns is a limit so that if self.turns reaches it, then
# we exit (for diagnostic purposes)
- # so, eg., if the master fs changes unceasingly, self.turns will remain 0.
+ # so, eg., if the master fs changes unceasingly, self.turns will remain
+ # 0.
self.crawls = 0
self.turns = 0
- self.total_turns = int(gconf.turns)
+ self.total_turns = rconf.turns
+ self.crawl_start = datetime.now()
self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
- self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0,
- 'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0}
- self.total_crawl_stats = self.get_initial_crawl_data()
self.start = None
self.change_seen = None
- # the authoritative (foreign, native) volinfo pair
- # which lets us deduce what to do when we refetch
- # the volinfos from system
- uuid_preset = getattr(gconf, 'volume_id', None)
- self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None)
# the actual volinfo we make use of
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
- self.checkpoint_thread = None
+ self.unlinked_gfids = set()
def init_keep_alive(cls):
"""start the keep-alive thread """
- timo = int(gconf.timeout or 0)
+ timo = gconf.get("slave-timeout", 0)
if timo > 0:
def keep_alive():
while True:
@@ -450,132 +447,182 @@ class GMasterCommon(object):
t = Thread(target=keep_alive)
t.start()
- def volinfo_query(self):
- """volume info state machine"""
- volinfo_sys, state_change = self.volinfo_hook()
- if self.inter_master:
- self.volinfo = volinfo_sys[self.KFGN]
- else:
- self.volinfo = volinfo_sys[self.KNAT]
- if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master):
- logging.info('new master is %s', self.uuid)
- if self.volinfo:
- logging.info("%s master with volume id %s ..." % \
- (self.inter_master and "intermediate" or "primary",
- self.uuid))
- if state_change == self.KFGN:
- gconf.configinterface.set('volume_id', self.uuid)
- if self.volinfo:
- if self.volinfo['retval']:
- raise GsyncdError ("master is corrupt")
- self.start_checkpoint_thread()
- else:
- if should_display_info or self.crawls == 0:
- if self.inter_master:
- logging.info("waiting for being synced from %s ..." % \
- self.volinfo_state[self.KFGN]['uuid'])
- else:
- logging.info("waiting for volume info ...")
- return True
+ def mgmt_lock(self):
+
+ """Take management volume lock """
+ if rconf.mgmt_lock_fd:
+ try:
+ fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ return True
+ except:
+ ex = sys.exc_info()[1]
+ if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
+ return False
+ raise
+
+ fd = None
+ bname = str(self.uuid) + "_" + rconf.args.slave_id + "_subvol_" \
+ + str(rconf.args.subvol_num) + ".lock"
+ mgmt_lock_dir = os.path.join(gconf.get("meta-volume-mnt"), "geo-rep")
+ path = os.path.join(mgmt_lock_dir, bname)
+ logging.debug(lf("lock file path", path=path))
+ try:
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ logging.info("Creating geo-rep directory in meta volume...")
+ try:
+ os.makedirs(mgmt_lock_dir)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST:
+ pass
+ else:
+ raise
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
+ else:
+ raise
+ try:
+ fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ # Save latest FD for future use
+ rconf.mgmt_lock_fd = fd
+ except:
+ ex = sys.exc_info()[1]
+ if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
+ # cannot grab, it's taken
+ rconf.mgmt_lock_fd = fd
+ return False
+ raise
- def should_crawl(cls):
- return (gconf.glusterd_uuid in cls.master.server.node_uuid())
+ return True
+
+ def should_crawl(self):
+ if not gconf.get("use-meta-volume"):
+ return rconf.args.local_node_id in self.master.server.node_uuid()
+
+ if not os.path.ismount(gconf.get("meta-volume-mnt")):
+ logging.error("Meta-volume is not mounted. Worker Exiting...")
+ sys.exit(1)
+ return self.mgmt_lock()
def register(self):
self.register()
- def crawlwrap(self, oneshot=False):
+ def crawlwrap(self, oneshot=False, register_time=None):
if oneshot:
# it's important to do this during the oneshot crawl as
# for a passive gsyncd (ie. in a replicate scenario)
# the keepalive thread would keep the connection alive.
self.init_keep_alive()
+
+ # If crawlwrap is called when partial history available,
+ # then it sets register_time which is the time when geo-rep
+ # worker registered to changelog consumption. Since nsec is
+ # not considered in register time, there are chances of skipping
+ # changes detection in xsync crawl. This limit will be reset when
+ # crawlwrap is called again.
+ self.live_changelog_start_time = None
+ if register_time:
+ self.live_changelog_start_time = (register_time, 0)
+
+ # no need to maintain volinfo state machine.
+ # in a cascading setup, each geo-replication session is
+ # independent (ie. 'volume-mark' and 'xtime' are not
+ # propagated). This is because the slave's xtime is now
+ # stored on the master itself. 'volume-mark' just identifies
+ # that we are in a cascading setup and need to enable
+ # 'geo-replication.ignore-pid-check' option.
+ volinfo_sys = self.volinfo_hook()
+ self.volinfo = volinfo_sys[self.KNAT]
+ inter_master = volinfo_sys[self.KFGN]
+ logging.debug("%s master with volume id %s ..." %
+ (inter_master and "intermediate" or "primary",
+ self.uuid))
+ rconf.volume_id = self.uuid
+ if self.volinfo:
+ if self.volinfo['retval']:
+ logging.warn(lf("master cluster's info may not be valid",
+ error=self.volinfo['retval']))
+ else:
+ raise GsyncdError("master volinfo unavailable")
self.lastreport['time'] = time.time()
- self.crawl_stats['crawl_starttime'] = datetime.now()
- logging.info('crawl interval: %d seconds' % self.sleep_interval)
t0 = time.time()
crawl = self.should_crawl()
while not self.terminate:
- if self.volinfo_query():
- continue
+ if self.start:
+ logging.debug("... crawl #%d done, took %.6f seconds" %
+ (self.crawls, time.time() - self.start))
+ self.start = time.time()
+ should_display_info = self.start - self.lastreport['time'] >= 60
+ if should_display_info:
+ logging.debug("%d crawls, %d turns",
+ self.crawls - self.lastreport['crawls'],
+ self.turns - self.lastreport['turns'])
+ self.lastreport.update(crawls=self.crawls,
+ turns=self.turns,
+ time=self.start)
t1 = time.time()
- if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
+ if int(t1 - t0) >= gconf.get("replica-failover-interval"):
crawl = self.should_crawl()
t0 = t1
+ self.update_worker_remote_node()
if not crawl:
+ self.status.set_passive()
+ # bring up _this_ brick to the cluster stime
+ # which is min of cluster (but max of the replicas)
+ brick_stime = self.xtime('.', self.slave)
+ cluster_stime = self.master.server.aggregated.stime_mnt(
+ '.', '.'.join([str(self.uuid), rconf.args.slave_id]))
+ logging.debug(lf("Crawl info",
+ cluster_stime=cluster_stime,
+ brick_stime=brick_stime))
+
+ if not isinstance(cluster_stime, int):
+ if brick_stime < cluster_stime:
+ self.slave.server.set_stime(
+ self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
+ self.upd_stime(cluster_stime)
+ # Purge all changelogs available in processing dir
+ # less than cluster_stime
+ proc_dir = os.path.join(self.tempdir,
+ ".processing")
+
+ if os.path.exists(proc_dir):
+ to_purge = [f for f in os.listdir(proc_dir)
+ if (f.startswith("CHANGELOG.") and
+ int(f.split('.')[-1]) <
+ cluster_stime[0])]
+ for f in to_purge:
+ os.remove(os.path.join(proc_dir, f))
+
time.sleep(5)
continue
- if self.start:
- logging.debug("... crawl #%d done, took %.6f seconds" % \
- (self.crawls, time.time() - self.start))
- self.start = t1
- should_display_info = self.start - self.lastreport['time'] >= 60
- if should_display_info:
- logging.info("%d crawls, %d turns",
- self.crawls - self.lastreport['crawls'],
- self.turns - self.lastreport['turns'])
- self.lastreport.update(crawls = self.crawls,
- turns = self.turns,
- time = self.start)
+
+ self.status.set_active()
self.crawl()
+
if oneshot:
return
time.sleep(self.sleep_interval)
- @classmethod
- def _checkpt_param(cls, chkpt, prm, xtimish=True):
- """use config backend to lookup a parameter belonging to
- checkpoint @chkpt"""
- cprm = getattr(gconf, 'checkpoint_' + prm, None)
- if not cprm:
- return
- chkpt_mapped, val = cprm.split(':', 1)
- if unescape(chkpt_mapped) != chkpt:
- return
- if xtimish:
- val = cls.deserialize_xtime(val)
- return val
-
- @classmethod
- def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True):
- """use config backend to store a parameter associated
- with checkpoint @chkpt"""
- if xtimish:
- val = cls.serialize_xtime(val)
- gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
-
@staticmethod
def humantime(*tpair):
"""format xtime-like (sec, nsec) pair to human readable format"""
ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\
- strftime("%Y-%m-%d %H:%M:%S")
+ strftime("%Y-%m-%d %H:%M:%S")
if len(tpair) > 1:
ts += '.' + str(tpair[1])
return ts
- def get_extra_info(self):
- str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced'])
- str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced'])
-
- self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime']
-
- str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time']))
- str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time'])
- str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time'])
- str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced'])
- str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced'])
- str_info += "\0"
- logging.debug(str_info)
- return str_info
-
def _crawl_time_format(self, crawl_time):
# Ex: 5 years, 4 days, 20:23:10
years, days = divmod(crawl_time.days, 365.25)
years = int(years)
days = int(days)
- date=""
+ date = ""
m, s = divmod(crawl_time.seconds, 60)
h, m = divmod(m, 60)
@@ -584,103 +631,15 @@ class GMasterCommon(object):
if days != 0:
date += "%s %s " % (days, "day" if days == 1 else "days")
- date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2))
+ date += "%s:%s:%s" % (string.zfill(h, 2),
+ string.zfill(m, 2), string.zfill(s, 2))
return date
- def checkpt_service(self, chan, chkpt, tgt):
- """checkpoint service loop
-
- monitor and verify checkpoint status for @chkpt, and listen
- for incoming requests for whom we serve a pretty-formatted
- status report"""
- if not chkpt:
- # dummy loop for the case when there is no checkpt set
- while True:
- select([chan], [], [])
- conn, _ = chan.accept()
- conn.send(self.get_extra_info())
- conn.close()
- completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
- if completed:
- completed = tuple(int(x) for x in completed.split('.'))
- while True:
- s,_,_ = select([chan], [], [], (not completed) and 5 or None)
- # either request made and we re-check to not
- # give back stale data, or we still hunting for completion
- if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark:
- # indexing has been reset since setting the checkpoint
- status = "is invalid"
- else:
- xtr = self.xtime('.', self.slave)
- if isinstance(xtr, int):
- raise GsyncdError("slave root directory is unaccessible (%s)",
- os.strerror(xtr))
- ncompleted = self.xtime_geq(xtr, tgt)
- if completed and not ncompleted: # stale data
- logging.warn("completion time %s for checkpoint %s became stale" % \
- (self.humantime(*completed), chkpt))
- completed = None
- gconf.confdata.delete('checkpoint-completed')
- if ncompleted and not completed: # just reaching completion
- completed = "%.6f" % time.time()
- self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False)
- completed = tuple(int(x) for x in completed.split('.'))
- logging.info("checkpoint %s completed" % chkpt)
- status = completed and \
- "completed at " + self.humantime(completed[0]) or \
- "not reached yet"
- if s:
- conn = None
- try:
- conn, _ = chan.accept()
- try:
- conn.send(" | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info()))
- except:
- exc = sys.exc_info()[1]
- if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \
- exc.errno == EPIPE:
- logging.debug('checkpoint client disconnected')
- else:
- raise
- finally:
- if conn:
- conn.close()
-
- def start_checkpoint_thread(self):
- """prepare and start checkpoint service"""
- if self.checkpoint_thread or not (
- getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None)
- ):
- return
- chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
- try:
- os.unlink(state_socket)
- except:
- if sys.exc_info()[0] == OSError:
- pass
- chan.bind(state_socket)
- chan.listen(1)
- checkpt_tgt = None
- if gconf.checkpoint:
- checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target')
- if not checkpt_tgt:
- checkpt_tgt = self.xtime('.')
- if isinstance(checkpt_tgt, int):
- raise GsyncdError("master root directory is unaccessible (%s)",
- os.strerror(checkpt_tgt))
- self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt)
- logging.debug("checkpoint target %s has been determined for checkpoint %s" % \
- (repr(checkpt_tgt), gconf.checkpoint))
- t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt))
- t.start()
- self.checkpoint_thread = t
-
def add_job(self, path, label, job, *a, **kw):
"""insert @job function to job table at @path with @label"""
- if self.jobtab.get(path) == None:
+ if self.jobtab.get(path) is None:
self.jobtab[path] = []
- self.jobtab[path].append((label, a, lambda : job(*a, **kw)))
+ self.jobtab[path].append((label, a, lambda: job(*a, **kw)))
def add_failjob(self, path, label):
"""invoke .add_job with a job that does nothing just fails"""
@@ -691,7 +650,7 @@ class GMasterCommon(object):
"""perform jobs registered for @path
Reset jobtab entry for @path,
- determine success as the conjuction of
+ determine success as the conjunction of
success of all the jobs. In case of
success, call .sendmark on @path
"""
@@ -701,7 +660,7 @@ class GMasterCommon(object):
ret = j[-1]()
if not ret:
succeed = False
- if succeed and not args[0] == None:
+ if succeed and not args[0] is None:
self.sendmark(path, *args)
return succeed
@@ -714,132 +673,348 @@ class GMasterCommon(object):
self.slave.server.setattr(path, adct)
self.set_slave_xtime(path, mark)
- @staticmethod
- def volinfo_state_machine(volinfo_state, volinfo_sys):
- """compute new volinfo_state from old one and incoming
- as of current system state, also indicating if there was a
- change regarding which volume mark is the authoritative one
-
- @volinfo_state, @volinfo_sys are pairs of volume mark dicts
- (foreign, native).
- Note this method is marked as static, ie. the computation is
- pure, without reliance on any excess implicit state. State
- transitions which are deemed as ambiguous or banned will raise
- an exception.
+class XCrawlMetadata(object):
+ def __init__(self, st_uid, st_gid, st_mode, st_atime, st_mtime):
+ self.st_uid = int(st_uid)
+ self.st_gid = int(st_gid)
+ self.st_mode = int(st_mode)
+ self.st_atime = float(st_atime)
+ self.st_mtime = float(st_mtime)
- """
- # store the value below "boxed" to emulate proper closures
- # (variables of the enclosing scope are available inner functions
- # provided they are no reassigned; mutation is OK).
- param = FreeObject(relax_mismatch = False, state_change = None, index=-1)
- def select_vi(vi0, vi):
- param.index += 1
- if vi and (not vi0 or vi0['uuid'] == vi['uuid']):
- if not vi0 and not param.relax_mismatch:
- param.state_change = param.index
- # valid new value found; for the rest, we are graceful about
- # uuid mismatch
- param.relax_mismatch = True
- return vi
- if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch:
- # uuid mismatch for master candidate, bail out
- raise GsyncdError("aborting on uuid change from %s to %s" % \
- (vi0['uuid'], vi['uuid']))
- # fall back to old
- return vi0
- newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys))
- srep = lambda vi: vi and vi['uuid'][0:8]
- logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \
- tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate))
- return newstate, param.state_change
class GMasterChangelogMixin(GMasterCommon):
+
""" changelog based change detection and syncing """
# index for change type and entry
IDX_START = 0
- IDX_END = 2
+ IDX_END = 2
+ UNLINK_ENTRY = 2
- POS_GFID = 0
- POS_TYPE = 1
- POS_ENTRY1 = 2
- POS_ENTRY2 = 3 # renames
+ POS_GFID = 0
+ POS_TYPE = 1
+ POS_ENTRY1 = -1
- _CL_TYPE_DATA_PFX = "D "
- _CL_TYPE_METADATA_PFX = "M "
- _CL_TYPE_ENTRY_PFX = "E "
+ TYPE_META = "M "
+ TYPE_GFID = "D "
+ TYPE_ENTRY = "E "
- TYPE_GFID = [_CL_TYPE_DATA_PFX] # ignoring metadata ops
- TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX]
+ MAX_EF_RETRIES = 10
+ MAX_OE_RETRIES = 10
- # flat directory heirarchy for gfid based access
+ # flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
- def fallback_xsync(self):
- logging.info('falling back to xsync mode')
- gconf.configinterface.set('change-detector', 'xsync')
- selfkill()
-
- def setup_working_dir(self):
- workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))
- logfile = os.path.join(workdir, 'changes.log')
- logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))
- return (workdir, logfile)
+ CHANGELOG_CONN_RETRIES = 5
+
+ def init_fop_batch_stats(self):
+ self.batch_stats = {
+ "CREATE": 0,
+ "MKNOD": 0,
+ "UNLINK": 0,
+ "MKDIR": 0,
+ "RMDIR": 0,
+ "LINK": 0,
+ "SYMLINK": 0,
+ "RENAME": 0,
+ "SETATTR": 0,
+ "SETXATTR": 0,
+ "XATTROP": 0,
+ "DATA": 0,
+ "ENTRY_SYNC_TIME": 0,
+ "META_SYNC_TIME": 0,
+ "DATA_START_TIME": 0
+ }
+
+ def update_fop_batch_stats(self, ty):
+ if ty in ['FSETXATTR']:
+ ty = 'SETXATTR'
+ self.batch_stats[ty] = self.batch_stats.get(ty, 0) + 1
+
+ def archive_and_purge_changelogs(self, changelogs):
+ # Creates tar file instead of tar.gz, since changelogs will
+ # be appended to existing tar. archive name is
+ # archive_<YEAR><MONTH>.tar
+ archive_name = "archive_%s.tar" % datetime.today().strftime(
+ gconf.get("changelog-archive-format"))
- def lstat(self, e):
try:
- return os.lstat(e)
- except (IOError, OSError):
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- return ex.errno
+ tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
+ archive_name),
+ "a")
+ except tarfile.ReadError:
+ tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
+ archive_name),
+ "w")
+
+ for f in changelogs:
+ try:
+ f = os.path.basename(f)
+ tar.add(os.path.join(self.processed_changelogs_dir, f),
+ arcname=os.path.basename(f))
+ except:
+ exc = sys.exc_info()[1]
+ if ((isinstance(exc, OSError) or
+ isinstance(exc, IOError)) and exc.errno == ENOENT):
+ continue
+ else:
+ tar.close()
+ raise
+ tar.close()
+
+ for f in changelogs:
+ try:
+ f = os.path.basename(f)
+ os.remove(os.path.join(self.processed_changelogs_dir, f))
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ continue
+ else:
+ raise
+
+ def setup_working_dir(self):
+ workdir = os.path.join(gconf.get("working-dir"),
+ escape(rconf.args.local_path))
+ logging.debug('changelog working dir %s' % workdir)
+ return workdir
+
+ def log_failures(self, failures, entry_key, gfid_prefix, log_prefix):
+ num_failures = 0
+ for failure in failures:
+ st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
+ if not isinstance(st, int):
+ num_failures += 1
+ logging.error(lf('%s FAILED' % log_prefix,
+ data=failure))
+ if failure[0]['op'] == 'MKDIR':
+ raise GsyncdError("The above directory failed to sync."
+ " Please fix it to proceed further.")
+
+ self.status.inc_value("failures", num_failures)
+
+ def fix_possible_entry_failures(self, failures, retry_count, entries):
+ pfx = gauxpfx()
+ fix_entry_ops = []
+ failures1 = []
+ remove_gfids = set()
+ for failure in failures:
+ if failure[2]['name_mismatch']:
+ pbname = failure[2]['slave_entry']
+ elif failure[2]['dst']:
+ pbname = failure[0]['entry1']
else:
- raise
+ pbname = failure[0]['entry']
+
+ op = failure[0]['op']
+ # name exists but gfid is different
+ if failure[2]['gfid_mismatch'] or failure[2]['name_mismatch']:
+ slave_gfid = failure[2]['slave_gfid']
+ st = lstat(os.path.join(pfx, slave_gfid))
+ # Takes care of scenarios with no hardlinks
+ if isinstance(st, int) and st == ENOENT:
+ logging.debug(lf('Entry not present on master. Fixing gfid '
+ 'mismatch in slave. Deleting the entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ # Add deletion to fix_entry_ops list
+ if failure[2]['slave_isdir']:
+ fix_entry_ops.append(
+ edct('RMDIR',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ else:
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ remove_gfids.add(slave_gfid)
+ if op in ['RENAME']:
+ # If renamed gfid doesn't exists on master, remove
+ # rename entry and unlink src on slave
+ st = lstat(os.path.join(pfx, failure[0]['gfid']))
+ if isinstance(st, int) and st == ENOENT:
+ logging.debug("Unlink source %s" % repr(failure))
+ remove_gfids.add(failure[0]['gfid'])
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
+ # Takes care of scenarios of hardlinks/renames on master
+ elif not isinstance(st, int):
+ if matching_disk_gfid(slave_gfid, pbname):
+ # Safe to ignore the failure as master contains same
+ # file with same gfid. Remove entry from entries list
+ logging.debug(lf('Fixing gfid mismatch in slave. '
+ ' Safe to ignore, take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ remove_gfids.add(failure[0]['gfid'])
+ if op == 'RENAME':
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
+ # The file exists on master but with different name.
+ # Probably renamed and got missed during xsync crawl.
+ elif failure[2]['slave_isdir']:
+ realpath = os.readlink(os.path.join(
+ rconf.args.local_path,
+ ".glusterfs",
+ slave_gfid[0:2],
+ slave_gfid[2:4],
+ slave_gfid))
+ dst_entry = os.path.join(pfx, realpath.split('/')[-2],
+ realpath.split('/')[-1])
+ src_entry = pbname
+ logging.debug(lf('Fixing dir name/gfid mismatch in '
+ 'slave', retry_count=retry_count,
+ entry=repr(failure)))
+ if src_entry == dst_entry:
+ # Safe to ignore the failure as master contains
+ # same directory as in slave with same gfid.
+ # Remove the failure entry from entries list
+ logging.debug(lf('Fixing dir name/gfid mismatch'
+ ' in slave. Safe to ignore, '
+ 'take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ try:
+ entries.remove(failure[0])
+ except ValueError:
+ pass
+ else:
+ rename_dict = edct('RENAME', gfid=slave_gfid,
+ entry=src_entry,
+ entry1=dst_entry, stat=st,
+ link=None)
+ logging.debug(lf('Fixing dir name/gfid mismatch'
+ ' in slave. Renaming',
+ retry_count=retry_count,
+ entry=repr(rename_dict)))
+ fix_entry_ops.append(rename_dict)
+ else:
+ # A hardlink file exists with different name or
+ # renamed file exists and we are sure from
+ # matching_disk_gfid check that the entry doesn't
+ # exist with same gfid so we can safely delete on slave
+ logging.debug(lf('Fixing file gfid mismatch in slave. '
+ 'Hardlink/Rename Case. Deleting entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ elif failure[1] == ENOENT:
+ if op in ['RENAME']:
+ pbname = failure[0]['entry1']
+ else:
+ pbname = failure[0]['entry']
- # sync data
- def syncdata(self, datas):
- logging.debug('datas: %s' % (datas))
- for data in datas:
- logging.debug('candidate for syncing %s' % data)
- pb = self.syncer.add(data)
- timeA = datetime.now()
- def regjob(se, xte, pb):
- rv = pb.wait()
- if rv[0]:
- logging.debug('synced ' + se)
- # update stats
- timeB = datetime.now()
- self.crawl_stats['last_synctime'] = timeB - timeA
- self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.crawl_stats['files_synced'] += 1
- self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced
-
- # cumulative statistics
- self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced
- self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.total_crawl_stats['files_synced'] += 1
- return True
+ pargfid = pbname.split('/')[1]
+ st = lstat(os.path.join(pfx, pargfid))
+ # Safe to ignore the failure as master doesn't contain
+ # parent directory.
+ if isinstance(st, int):
+ logging.debug(lf('Fixing ENOENT error in slave. Parent '
+ 'does not exist on master. Safe to '
+ 'ignore, take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ try:
+ entries.remove(failure[0])
+ except ValueError:
+ pass
else:
- if rv[1] in [23, 24]:
- # stat to check if the file exist
- st = self.lstat(se)
- if isinstance(st, int):
- # file got unlinked in the interim
- return True
- logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
- self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb)
- if self.wait(self.FLAT_DIR_HIERARCHY, None):
- self.update_crawl_data()
- return True
+ logging.debug(lf('Fixing ENOENT error in slave. Create '
+ 'parent directory on slave.',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ realpath = os.readlink(os.path.join(rconf.args.local_path,
+ ".glusterfs",
+ pargfid[0:2],
+ pargfid[2:4],
+ pargfid))
+ dir_entry = os.path.join(pfx, realpath.split('/')[-2],
+ realpath.split('/')[-1])
+ fix_entry_ops.append(
+ edct('MKDIR', gfid=pargfid, entry=dir_entry,
+ mode=st.st_mode, uid=st.st_uid, gid=st.st_gid))
+
+ logging.debug("remove_gfids: %s" % repr(remove_gfids))
+ if remove_gfids:
+ for e in entries:
+ if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \
+ and e['gfid'] in remove_gfids:
+ logging.debug("Removed entry op from retrial list: entry: %s" % repr(e))
+ e['skip_entry'] = True
+
+ if fix_entry_ops:
+ # Process deletions of entries whose gfids are mismatched
+ failures1 = self.slave.server.entry_ops(fix_entry_ops)
+
+ return (failures1, fix_entry_ops)
+
+ def handle_entry_failures(self, failures, entries):
+ retries = 0
+ pending_failures = False
+ failures1 = []
+ failures2 = []
+ entry_ops1 = []
+ entry_ops2 = []
+
+ if failures:
+ pending_failures = True
+ failures1 = failures
+ entry_ops1 = entries
+
+ while pending_failures and retries < self.MAX_EF_RETRIES:
+ retries += 1
+ (failures2, entry_ops2) = self.fix_possible_entry_failures(
+ failures1, retries, entry_ops1)
+ if not failures2:
+ pending_failures = False
+ logging.info(lf('Successfully fixed entry ops with gfid '
+ 'mismatch', retry_count=retries))
+ else:
+ pending_failures = True
+ failures1 = failures2
+ entry_ops1 = entry_ops2
- def process_change(self, change, done):
- clist = []
+ if pending_failures:
+ for failure in failures1:
+ logging.error("Failed to fix entry ops %s", repr(failure))
+
+ def process_change(self, change, done, retry):
+ pfx = gauxpfx()
+ clist = []
entries = []
- purges = set()
- links = set()
+ meta_gfid = set()
datas = set()
- pfx = gauxpfx()
+
+ change_ts = change.split(".")[-1]
+
+ # Ignore entry ops which are already processed in Changelog modes
+ ignore_entry_ops = False
+ entry_stime = None
+ data_stime = None
+ if self.name in ["live_changelog", "history_changelog"]:
+ entry_stime = self.get_entry_stime()
+ data_stime = self.get_data_stime()
+
+ if entry_stime is not None and data_stime is not None:
+ # if entry_stime is not None but data_stime > entry_stime
+ # This situation is caused by the stime update of Passive worker
+ # Consider data_stime in this case.
+ if data_stime[0] > entry_stime[0]:
+ entry_stime = data_stime
+
+ # Compare the entry_stime with changelog file suffix
+ # if changelog time is less than entry_stime then ignore
+ if int(change_ts) <= entry_stime[0]:
+ ignore_entry_ops = True
+
try:
f = open(change, "r")
clist = f.readlines()
@@ -847,137 +1022,631 @@ class GMasterChangelogMixin(GMasterCommon):
except IOError:
raise
- def edct(op, **ed):
- dct = {}
- dct['op'] = op
- for k in ed:
- if k == 'stat':
- st = ed[k]
- dst = dct['stat'] = {}
- dst['uid'] = st.st_uid
- dst['gid'] = st.st_gid
- dst['mode'] = st.st_mode
- else:
- dct[k] = ed[k]
- return dct
for e in clist:
e = e.strip()
- et = e[self.IDX_START:self.IDX_END]
- ec = e[self.IDX_END:].split(' ')
- if et in self.TYPE_ENTRY:
+ et = e[self.IDX_START:self.IDX_END] # entry type
+ ec = e[self.IDX_END:].split(' ') # rest of the bits
+
+ # skip ENTRY operation if hot tier brick
+ if self.name == 'live_changelog' or \
+ self.name == 'history_changelog':
+ if rconf.args.is_hottier and et == self.TYPE_ENTRY:
+ logging.debug(lf('skip ENTRY op if hot tier brick',
+ op=ec[self.POS_TYPE]))
+ continue
+
+ # Data and Meta operations are decided while parsing
+ # UNLINK/RMDIR/MKNOD except that case ignore all the other
+ # entry ops if ignore_entry_ops is True.
+ # UNLINK/RMDIR/MKNOD entry_ops are ignored in the end
+ if ignore_entry_ops and et == self.TYPE_ENTRY and \
+ ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]:
+ continue
+
+ if et == self.TYPE_ENTRY:
+ # extract information according to the type of
+ # the entry operation. create(), mkdir() and mknod()
+ # have mode, uid, gid information in the changelog
+ # itself, so no need to stat()...
ty = ec[self.POS_TYPE]
- en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1]))
+
+ self.update_fop_batch_stats(ec[self.POS_TYPE])
+
+ # PARGFID/BNAME
+ en = unescape_space_newline(
+ os.path.join(pfx, ec[self.POS_ENTRY1]))
+ # GFID of the entry
gfid = ec[self.POS_GFID]
- # definitely need a better way bucketize entry ops
+
if ty in ['UNLINK', 'RMDIR']:
- entries.append(edct(ty, gfid=gfid, entry=en))
- purges.update([os.path.join(pfx, gfid)])
- continue
- if not ty == 'RENAME':
+ # The index of PARGFID/BNAME for UNLINK, RMDIR
+ # is no more the last index. It varies based on
+ # changelog.capture-del-path is enabled or not.
+ en = unescape_space_newline(
+ os.path.join(pfx, ec[self.UNLINK_ENTRY]))
+
+ # Remove from DATA list, so that rsync will
+ # not fail
+ pt = os.path.join(pfx, ec[0])
+ st = lstat(pt)
+ if pt in datas and isinstance(st, int):
+ # file got unlinked, May be historical Changelog
+ datas.remove(pt)
+
+ if ty in ['RMDIR'] and not isinstance(st, int):
+ logging.info(lf('Ignoring rmdir. Directory present in '
+ 'master', gfid=gfid, pgfid_bname=en))
+ continue
+
+ if not gconf.get("ignore-deletes"):
+ if not ignore_entry_ops:
+ entries.append(edct(ty, gfid=gfid, entry=en))
+ elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
+ # Special case: record mknod as link
+ if ty in ['MKNOD']:
+ mode = int(ec[2])
+ if mode & 0o1000:
+ # Avoid stat'ing the file as it
+ # may be deleted in the interim
+ st = FreeObject(st_mode=int(ec[2]),
+ st_uid=int(ec[3]),
+ st_gid=int(ec[4]),
+ st_atime=0,
+ st_mtime=0)
+
+ # So, it may be deleted, but still we are
+ # append LINK? Because, the file will be
+ # CREATED if source not exists.
+ entries.append(edct('LINK', stat=st, entry=en,
+ gfid=gfid))
+
+ # Here, we have the assumption that only
+ # tier-gfid.linkto causes this mknod. Add data
+ datas.add(os.path.join(pfx, ec[0]))
+ continue
+
+ # stat info. present in the changelog itself
+ entries.append(edct(ty, gfid=gfid, entry=en,
+ mode=int(ec[2]),
+ uid=int(ec[3]), gid=int(ec[4])))
+ elif ty == "RENAME":
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ st = {}
+
+ rl = None
+ if st and stat.S_ISLNK(st.st_mode):
+ rl = errno_wrap(os.readlink, [en], [ENOENT],
+ [ESTALE, EINTR])
+ if isinstance(rl, int):
+ rl = None
+
+ e1 = unescape_space_newline(
+ os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
+ stat=st, link=rl))
+ # If src doesn't exist while doing rename, destination
+ # is created. If data is not followed by rename, this
+ # remains zero byte file on slave. Hence add data entry
+ # for renames
+ datas.add(os.path.join(pfx, gfid))
+ else:
+ # stat() to get mode and other information
+ if not matching_disk_gfid(gfid, en):
+ logging.debug(lf('Ignoring entry, purged in the '
+ 'interim', file=en, gfid=gfid))
+ continue
+
go = os.path.join(pfx, gfid)
- st = self.lstat(go)
+ st = lstat(go)
if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go)
+ logging.debug(lf('Ignoring entry, purged in the '
+ 'interim', file=en, gfid=gfid))
continue
- if ty in ['CREATE', 'MKDIR', 'MKNOD']:
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
- elif ty == 'LINK':
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
- links.update([os.path.join(pfx, gfid)])
- elif ty == 'SYMLINK':
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en)))
- elif ty == 'RENAME':
- e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2]))
- entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2))
+
+ if ty == 'LINK':
+ rl = None
+ if st and stat.S_ISLNK(st.st_mode):
+ rl = errno_wrap(os.readlink, [en], [ENOENT],
+ [ESTALE, EINTR])
+ if isinstance(rl, int):
+ rl = None
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid,
+ link=rl))
+ # If src doesn't exist while doing link, destination
+ # is created based on file type. If data is not
+ # followed by link, this remains zero byte file on
+ # slave. Hence add data entry for links
+ if rl is None:
+ datas.add(os.path.join(pfx, gfid))
+ elif ty == 'SYMLINK':
+ rl = errno_wrap(os.readlink, [en], [ENOENT],
+ [ESTALE, EINTR])
+ if isinstance(rl, int):
+ continue
+
+ entries.append(
+ edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
+ else:
+ logging.warn(lf('ignoring op',
+ gfid=gfid,
+ type=ty))
+ elif et == self.TYPE_GFID:
+ # If self.unlinked_gfids is available, then that means it is
+ # retrying the changelog second time. Do not add the GFID's
+ # to rsync job if failed previously but unlinked in master
+ if self.unlinked_gfids and \
+ os.path.join(pfx, ec[0]) in self.unlinked_gfids:
+ logging.debug("ignoring data, since file purged interim")
else:
- pass
- elif et in self.TYPE_GFID:
- da = os.path.join(pfx, ec[0])
- st = self.lstat(da)
- if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % da)
- continue
- datas.update([da])
+ datas.add(os.path.join(pfx, ec[0]))
+ elif et == self.TYPE_META:
+ self.update_fop_batch_stats(ec[self.POS_TYPE])
+ if ec[1] == 'SETATTR': # only setattr's for now...
+ if len(ec) == 5:
+ # In xsync crawl, we already have stat data
+ # avoid doing stat again
+ meta_gfid.add((os.path.join(pfx, ec[0]),
+ XCrawlMetadata(st_uid=ec[2],
+ st_gid=ec[3],
+ st_mode=ec[4],
+ st_atime=ec[5],
+ st_mtime=ec[6])))
+ else:
+ meta_gfid.add((os.path.join(pfx, ec[0]), ))
+ elif ec[1] in ['SETXATTR', 'XATTROP', 'FXATTROP']:
+ # To sync xattr/acls use rsync/tar, --xattrs and --acls
+ # switch to rsync and tar
+ if not gconf.get("sync-method") == "tarssh" and \
+ (gconf.get("sync-xattrs") or gconf.get("sync-acls")):
+ datas.add(os.path.join(pfx, ec[0]))
+ else:
+ logging.warn(lf('got invalid fop type',
+ type=et))
logging.debug('entries: %s' % repr(entries))
+
+ # Increment counters for Status
+ self.files_in_batch += len(datas)
+ self.status.inc_value("data", len(datas))
+
+ self.batch_stats["DATA"] += self.files_in_batch - \
+ self.batch_stats["SETXATTR"] - \
+ self.batch_stats["XATTROP"]
+
+ entry_start_time = time.time()
# sync namespace
- if (entries):
- self.slave.server.entry_ops(entries)
+ if entries and not ignore_entry_ops:
+ # Increment counters for Status
+ self.status.inc_value("entry", len(entries))
+
+ failures = self.slave.server.entry_ops(entries)
+
+ if gconf.get("gfid-conflict-resolution"):
+ count = 0
+ if failures:
+ logging.info(lf('Entry ops failed with gfid mismatch',
+ count=len(failures)))
+ while failures and count < self.MAX_OE_RETRIES:
+ count += 1
+ self.handle_entry_failures(failures, entries)
+ logging.info(lf('Retry original entries', count=count))
+ failures = self.slave.server.entry_ops(entries)
+ if not failures:
+ logging.info("Successfully fixed all entry ops with "
+ "gfid mismatch")
+ break
+
+ self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+ self.status.dec_value("entry", len(entries))
+
+ # Update Entry stime in Brick Root only in case of Changelog mode
+ if self.name in ["live_changelog", "history_changelog"]:
+ entry_stime_to_update = (int(change_ts) - 1, 0)
+ self.upd_entry_stime(entry_stime_to_update)
+ self.status.set_field("last_synced_entry",
+ entry_stime_to_update[0])
+
+ self.batch_stats["ENTRY_SYNC_TIME"] += time.time() - entry_start_time
+
+ if ignore_entry_ops:
+ # Book keeping, to show in logs the range of Changelogs skipped
+ self.num_skipped_entry_changelogs += 1
+ if self.skipped_entry_changelogs_first is None:
+ self.skipped_entry_changelogs_first = change_ts
+
+ self.skipped_entry_changelogs_last = change_ts
+
+ meta_start_time = time.time()
+ # sync metadata
+ if meta_gfid:
+ meta_entries = []
+ for go in meta_gfid:
+ if len(go) > 1:
+ st = go[1]
+ else:
+ st = lstat(go[0])
+ if isinstance(st, int):
+ logging.debug(lf('file got purged in the interim',
+ file=go[0]))
+ continue
+ meta_entries.append(edct('META', go=go[0], stat=st))
+ if meta_entries:
+ self.status.inc_value("meta", len(meta_entries))
+ failures = self.slave.server.meta_ops(meta_entries)
+ self.log_failures(failures, 'go', '', 'META')
+ self.status.dec_value("meta", len(meta_entries))
+
+ self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time
+
+ if self.batch_stats["DATA_START_TIME"] == 0:
+ self.batch_stats["DATA_START_TIME"] = time.time()
+
# sync data
- if self.syncdata(datas - (purges - links)):
- if done:
- self.master.server.changelog_done(change)
- return True
+ if datas:
+ self.a_syncdata(datas)
+ self.datas_in_batch.update(datas)
def process(self, changes, done=1):
- for change in changes:
- times = 0
- while True:
- times += 1
- logging.debug('processing change %s [%d time(s)]' % (change, times))
- if self.process_change(change, done):
- break
- # it's either entry_ops() or Rsync that failed to do it's
- # job. Mostly it's entry_ops() [which currently has a problem
- # of failing to create an entry but failing to return an errno]
- # Therefore we do not know if it's either Rsync or the freaking
- # entry_ops() that failed... so we retry the _whole_ changelog
- # again.
- # TODO: remove entry retries when it's gets fixed.
- logging.warn('incomplete sync, retrying changelog: %s' % change)
- time.sleep(0.5)
- self.turns += 1
+ tries = 0
+ retry = False
+ self.unlinked_gfids = set()
+ self.files_in_batch = 0
+ self.datas_in_batch = set()
+ # Error log disabled till the last round
+ self.syncer.disable_errorlog()
+ self.skipped_entry_changelogs_first = None
+ self.skipped_entry_changelogs_last = None
+ self.num_skipped_entry_changelogs = 0
+ self.batch_start_time = time.time()
+ self.init_fop_batch_stats()
- def upd_stime(self, stime):
- if stime:
- self.sendmark(self.FLAT_DIR_HIERARCHY, stime)
+ while True:
+ # first, fire all changelog transfers in parallel. entry and
+ # metadata are performed synchronously, therefore in serial.
+ # However at the end of each changelog, data is synchronized
+ # with syncdata_async() - which means it is serial w.r.t
+ # entries/metadata of that changelog but happens in parallel
+ # with data of other changelogs.
+
+ if retry:
+ if tries == (gconf.get("max-rsync-retries") - 1):
+ # Enable Error logging if it is last retry
+ self.syncer.enable_errorlog()
+
+ # Remove Unlinked GFIDs from Queue
+ for unlinked_gfid in self.unlinked_gfids:
+ if unlinked_gfid in self.datas_in_batch:
+ self.datas_in_batch.remove(unlinked_gfid)
+
+ # Retry only Sync. Do not retry entry ops
+ if self.datas_in_batch:
+ self.a_syncdata(self.datas_in_batch)
+ else:
+ for change in changes:
+ logging.debug(lf('processing change',
+ changelog=change))
+ self.process_change(change, done, retry)
+ if not retry:
+ # number of changelogs processed in the batch
+ self.turns += 1
+
+ # Now we wait for all the data transfers fired off in the above
+ # step to complete. Note that this is not ideal either. Ideally
+ # we want to trigger the entry/meta-data transfer of the next
+ # batch while waiting for the data transfer of the current batch
+ # to finish.
+
+ # Note that the reason to wait for the data transfer (vs doing it
+ # completely in the background and call the changelog_done()
+ # asynchronously) is because this waiting acts as a "backpressure"
+ # and prevents a spiraling increase of wait stubs from consuming
+ # unbounded memory and resources.
+
+ # update the slave's time with the timestamp of the _last_
+ # changelog file time suffix. Since, the changelog prefix time
+ # is the time when the changelog was rolled over, introduce a
+ # tolerance of 1 second to counter the small delta b/w the
+ # marker update and gettimeofday().
+ # NOTE: this is only for changelog mode, not xsync.
+
+ # @change is the last changelog (therefore max time for this batch)
+ if self.syncdata_wait():
+ self.unlinked_gfids = set()
+ if done:
+ xtl = (int(change.split('.')[-1]) - 1, 0)
+ self.upd_stime(xtl)
+ list(map(self.changelog_done_func, changes))
+ self.archive_and_purge_changelogs(changes)
+
+ # Reset Data counter after sync
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
+ self.datas_in_batch = set()
+ break
+
+ # We do not know which changelog transfer failed, retry everything.
+ retry = True
+ tries += 1
+ if tries == gconf.get("max-rsync-retries"):
+ logging.error(lf('changelogs could not be processed '
+ 'completely - moving on...',
+ files=list(map(os.path.basename, changes))))
+
+ # Reset data counter on failure
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
+ self.datas_in_batch = set()
+
+ if done:
+ xtl = (int(change.split('.')[-1]) - 1, 0)
+ self.upd_stime(xtl)
+ list(map(self.changelog_done_func, changes))
+ self.archive_and_purge_changelogs(changes)
+ break
+ # it's either entry_ops() or Rsync that failed to do it's
+ # job. Mostly it's entry_ops() [which currently has a problem
+ # of failing to create an entry but failing to return an errno]
+ # Therefore we do not know if it's either Rsync or the freaking
+ # entry_ops() that failed... so we retry the _whole_ changelog
+ # again.
+ # TODO: remove entry retries when it's gets fixed.
+ logging.warn(lf('incomplete sync, retrying changelogs',
+ files=list(map(os.path.basename, changes))))
+
+ # Reset the Data counter before Retry
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
+ self.init_fop_batch_stats()
+ time.sleep(0.5)
+
+ # Log the Skipped Entry ops range if any
+ if self.skipped_entry_changelogs_first is not None and \
+ self.skipped_entry_changelogs_last is not None:
+ logging.info(lf("Skipping already processed entry ops",
+ from_changelog=self.skipped_entry_changelogs_first,
+ to_changelog=self.skipped_entry_changelogs_last,
+ num_changelogs=self.num_skipped_entry_changelogs))
+
+ # Log Current batch details
+ if changes:
+ logging.info(
+ lf("Entry Time Taken",
+ UNL=self.batch_stats["UNLINK"],
+ RMD=self.batch_stats["RMDIR"],
+ CRE=self.batch_stats["CREATE"],
+ MKN=self.batch_stats["MKNOD"],
+ MKD=self.batch_stats["MKDIR"],
+ REN=self.batch_stats["RENAME"],
+ LIN=self.batch_stats["LINK"],
+ SYM=self.batch_stats["SYMLINK"],
+ duration="%.4f" % self.batch_stats["ENTRY_SYNC_TIME"]))
+
+ logging.info(
+ lf("Data/Metadata Time Taken",
+ SETA=self.batch_stats["SETATTR"],
+ meta_duration="%.4f" % self.batch_stats["META_SYNC_TIME"],
+ SETX=self.batch_stats["SETXATTR"],
+ XATT=self.batch_stats["XATTROP"],
+ DATA=self.batch_stats["DATA"],
+ data_duration="%.4f" % (
+ time.time() - self.batch_stats["DATA_START_TIME"])))
+
+ logging.info(
+ lf("Batch Completed",
+ mode=self.name,
+ duration="%.4f" % (time.time() - self.batch_start_time),
+ changelog_start=changes[0].split(".")[-1],
+ changelog_end=changes[-1].split(".")[-1],
+ num_changelogs=len(changes),
+ stime=self.get_data_stime(),
+ entry_stime=self.get_entry_stime()))
+
+ def upd_entry_stime(self, stime):
+ self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
+ self.uuid,
+ stime)
+
+ def upd_stime(self, stime, path=None):
+ if not path:
+ path = self.FLAT_DIR_HIERARCHY
+ if not stime == URXTIME:
+ self.sendmark(path, stime)
+
+ # Update last_synced_time in status file based on stime
+ # only update stime if stime xattr set to Brick root
+ if path == self.FLAT_DIR_HIERARCHY:
+ chkpt_time = gconf.getr("checkpoint")
+ checkpoint_time = 0
+ if chkpt_time is not None:
+ checkpoint_time = int(chkpt_time)
+
+ self.status.set_last_synced(stime, checkpoint_time)
+
+ def update_worker_remote_node(self):
+ node = rconf.args.resource_remote
+ node_data = node.split("@")
+ node = node_data[-1]
+ remote_node_ip, _ = host_brick_split(node)
+ self.status.set_slave_node(remote_node_ip)
+
+ def changelogs_batch_process(self, changes):
+ changelogs_batches = []
+ current_size = 0
+ for c in changes:
+ si = os.lstat(c).st_size
+ if (si + current_size) > gconf.get("changelog-batch-size"):
+ # Create new batch if single Changelog file greater than
+ # Max Size! or current batch size exceeds Max size
+ changelogs_batches.append([c])
+ current_size = si
+ else:
+ # Append to last batch, if No batches available Create one
+ current_size += si
+ if not changelogs_batches:
+ changelogs_batches.append([c])
+ else:
+ changelogs_batches[-1].append(c)
+
+ for batch in changelogs_batches:
+ logging.debug(lf('processing changes',
+ batch=batch))
+ self.process(batch)
def crawl(self):
+ self.status.set_worker_crawl_status("Changelog Crawl")
changes = []
- try:
- self.master.server.changelog_scan()
- self.crawls += 1
- except OSError:
- self.fallback_xsync()
- changes = self.master.server.changelog_getchanges()
+ # get stime (from the brick) and purge changelogs
+ # that are _historical_ to that time.
+ data_stime = self.get_data_stime()
+
+ libgfchangelog.scan()
+ self.crawls += 1
+ changes = libgfchangelog.getchanges()
if changes:
- xtl = self.xtime(self.FLAT_DIR_HIERARCHY)
- if isinstance(xtl, int):
- raise GsyncdError('master is corrupt')
- logging.debug('processing changes %s' % repr(changes))
- self.process(changes)
- self.upd_stime(xtl)
+ if data_stime:
+ logging.info(lf("slave's time",
+ stime=data_stime))
+ processed = [x for x in changes
+ if int(x.split('.')[-1]) < data_stime[0]]
+ for pr in processed:
+ logging.debug(
+ lf('skipping already processed change',
+ changelog=os.path.basename(pr)))
+ self.changelog_done_func(pr)
+ changes.remove(pr)
+ self.archive_and_purge_changelogs(processed)
+
+ self.changelogs_batch_process(changes)
+
+ def register(self, register_time, status):
+ self.sleep_interval = gconf.get("change-interval")
+ self.changelog_done_func = libgfchangelog.done
+ self.tempdir = self.setup_working_dir()
+ self.processed_changelogs_dir = os.path.join(self.tempdir,
+ ".processed")
+ self.name = "live_changelog"
+ self.status = status
+
+
+class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
+ def register(self, register_time, status):
+ self.changelog_register_time = register_time
+ self.history_crawl_start_time = register_time
+ self.changelog_done_func = libgfchangelog.history_done
+ self.history_turns = 0
+ self.tempdir = self.setup_working_dir()
+ self.processed_changelogs_dir = os.path.join(self.tempdir,
+ ".history/.processed")
+ self.name = "history_changelog"
+ self.status = status
+
+ def crawl(self):
+ self.history_turns += 1
+ self.status.set_worker_crawl_status("History Crawl")
+ data_stime = self.get_data_stime()
+
+ end_time = int(time.time())
+
+ #as start of historical crawl marks Geo-rep worker restart
+ if gconf.get("ignore-deletes"):
+ logging.info(lf('ignore-deletes config option is set',
+ stime=data_stime))
+
+ logging.info(lf('starting history crawl',
+ turns=self.history_turns,
+ stime=data_stime,
+ etime=end_time,
+ entry_stime=self.get_entry_stime()))
+
+ if not data_stime or data_stime == URXTIME:
+ raise NoStimeAvailable()
+
+ # Changelogs backend path is hardcoded as
+ # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
+ # location then consuming history will not work(Known issue as of now)
+ changelog_path = os.path.join(rconf.args.local_path,
+ ".glusterfs/changelogs")
+ ret, actual_end = libgfchangelog.history_changelog(
+ changelog_path,
+ data_stime[0],
+ end_time,
+ gconf.get("sync-jobs"))
+
+ # scan followed by getchanges till scan returns zero.
+ # history_scan() is blocking call, till it gets the number
+ # of changelogs to process. Returns zero when no changelogs
+ # to be processed. returns positive value as number of changelogs
+ # to be processed, which will be fetched using
+ # history_getchanges()
+ while libgfchangelog.history_scan() > 0:
+ self.crawls += 1
+
+ changes = libgfchangelog.history_getchanges()
+ if changes:
+ if data_stime:
+ logging.info(lf("slave's time",
+ stime=data_stime))
+ processed = [x for x in changes
+ if int(x.split('.')[-1]) < data_stime[0]]
+ for pr in processed:
+ logging.debug(lf('skipping already processed change',
+ changelog=os.path.basename(pr)))
+ self.changelog_done_func(pr)
+ changes.remove(pr)
+
+ self.changelogs_batch_process(changes)
+
+ history_turn_time = int(time.time()) - self.history_crawl_start_time
+
+ logging.info(lf('finished history crawl',
+ endtime=actual_end,
+ stime=self.get_data_stime(),
+ entry_stime=self.get_entry_stime()))
+
+ # If TS returned from history_changelog is < register_time
+ # then FS crawl may be required, since history is only available
+ # till TS returned from history_changelog
+ if actual_end < self.changelog_register_time:
+ if self.history_turns < 2:
+ sleep_time = 1
+ if history_turn_time < CHANGELOG_ROLLOVER_TIME:
+ sleep_time = CHANGELOG_ROLLOVER_TIME - history_turn_time
+ time.sleep(sleep_time)
+ self.history_crawl_start_time = int(time.time())
+ self.crawl()
+ else:
+ # This exception will be caught in resource.py and
+ # fallback to xsync for the small gap.
+ raise PartialHistoryAvailable(str(actual_end))
- def register(self):
- (workdir, logfile) = self.setup_working_dir()
- self.sleep_interval = int(gconf.change_interval)
- # register with the changelog library
- try:
- # 9 == log level (DEBUG)
- # 5 == connection retries
- self.master.server.changelog_register(gconf.local_path,
- workdir, logfile, 9, 5)
- except OSError:
- self.fallback_xsync()
- # control should not reach here
- raise
class GMasterXsyncMixin(GMasterChangelogMixin):
- """
+ """
This crawl needs to be xtime based (as of now
- it's not. this is beacuse we generate CHANGELOG
+ it's not. this is because we generate CHANGELOG
file during each crawl which is then processed
by process_change()).
For now it's used as a one-shot initial sync
mechanism and only syncs directories, regular
- files and symlinks.
+ files, hardlinks and symlinks.
"""
- def register(self):
+ XSYNC_MAX_ENTRIES = 1 << 13
+
+ def register(self, register_time=None, status=None):
+ self.status = status
+ self.counter = 0
+ self.comlist = []
+ self.stimes = []
self.sleep_interval = 60
- self.tempdir = self.setup_working_dir()[0]
+ self.tempdir = self.setup_working_dir()
+ logging.info(lf('Working dir',
+ path=self.tempdir))
self.tempdir = os.path.join(self.tempdir, 'xsync')
- logging.info('xsync temp directory: %s' % self.tempdir)
+ self.processed_changelogs_dir = self.tempdir
+ self.name = "xsync"
try:
os.makedirs(self.tempdir)
except OSError:
@@ -986,250 +1655,248 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
pass
else:
raise
+ # Purge stale unprocessed xsync changelogs
+ for f in os.listdir(self.tempdir):
+ if f.startswith("XSYNC-CHANGELOG"):
+ os.remove(os.path.join(self.tempdir, f))
+
+
+ def crawl(self):
+ """
+ event dispatcher thread
+
+ this thread dispatches either changelog or synchronizes stime.
+ additionally terminates itself on receiving a 'finale' event
+ """
+ def Xsyncer():
+ self.Xcrawl()
+ t = Thread(target=Xsyncer)
+ t.start()
+ logging.info(lf('starting hybrid crawl',
+ stime=self.get_data_stime()))
+ self.status.set_worker_crawl_status("Hybrid Crawl")
+ while True:
+ try:
+ item = self.comlist.pop(0)
+ if item[0] == 'finale':
+ logging.info(lf('finished hybrid crawl',
+ stime=self.get_data_stime()))
+ break
+ elif item[0] == 'xsync':
+ logging.info(lf('processing xsync changelog',
+ path=item[1]))
+ self.process([item[1]], 0)
+ self.archive_and_purge_changelogs([item[1]])
+ elif item[0] == 'stime':
+ logging.debug(lf('setting slave time',
+ time=item[1]))
+ self.upd_stime(item[1][1], item[1][0])
+ else:
+ logging.warn(lf('unknown tuple in comlist',
+ entry=item))
+ except IndexError:
+ time.sleep(1)
def write_entry_change(self, prefix, data=[]):
+ if not getattr(self, "fh", None):
+ self.open()
+
self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
def open(self):
try:
- self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
+ self.xsync_change = os.path.join(
+ self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
self.fh = open(self.xsync_change, 'w')
except IOError:
raise
def close(self):
- self.fh.close()
+ if getattr(self, "fh", None):
+ self.fh.flush()
+ os.fsync(self.fh.fileno())
+ self.fh.close()
+ self.fh = None
def fname(self):
return self.xsync_change
- def crawl(self, path='.', xtr=None, done=0):
- """ generate a CHANGELOG file consumable by process_change """
+ def put(self, mark, item):
+ self.comlist.append((mark, item))
+
+ def sync_xsync(self, last):
+ """schedule a processing of changelog"""
+ self.close()
+ if self.counter > 0:
+ self.put('xsync', self.fname())
+ self.counter = 0
+ if not last:
+ time.sleep(1) # make sure changelogs are 1 second apart
+
+ def sync_stime(self, stime=None, last=False):
+ """schedule a stime synchronization"""
+ if stime:
+ self.put('stime', stime)
+ if last:
+ self.put('finale', None)
+
+ def sync_done(self, stime=[], last=False):
+ self.sync_xsync(last)
+ if stime:
+ # Send last as True only for last stime entry
+ for st in stime[:-1]:
+ self.sync_stime(st, False)
+
+ if stime and stime[-1]:
+ self.sync_stime(stime[-1], last)
+
+ def is_sticky(self, path, mo):
+ """check for DHTs linkto sticky bit file"""
+ sticky = False
+ if mo & 0o1000:
+ sticky = self.master.server.linkto_check(path)
+ return sticky
+
+ def Xcrawl(self, path='.', xtr_root=None):
+ """
+ generate a CHANGELOG file consumable by process_change.
+
+ slave's xtime (stime) is _cached_ for comparisons across
+ the filesystem tree, but set after directory synchronization.
+ """
if path == '.':
- self.open()
self.crawls += 1
- if not xtr:
+ if not xtr_root:
# get the root stime and use it for all comparisons
- xtr = self.xtime('.', self.slave)
- if isinstance(xtr, int):
- if xtr != ENOENT:
- raise GsyncdError('slave is corrupt')
- xtr = self.minus_infinity
+ xtr_root = self.xtime('.', self.slave)
+ if isinstance(xtr_root, int):
+ if xtr_root != ENOENT:
+ logging.warn(lf("slave cluster not returning the "
+ "xtime for root",
+ error=xtr_root))
+ xtr_root = self.minus_infinity
xtl = self.xtime(path)
if isinstance(xtl, int):
- raise GsyncdError('master is corrupt')
- if xtr == xtl:
+ logging.warn("master cluster's xtime not found")
+ xtr = self.xtime(path, self.slave)
+ if isinstance(xtr, int):
+ if xtr != ENOENT:
+ logging.warn(lf("slave cluster not returning the "
+ "xtime for dir",
+ path=path,
+ error=xtr))
+ xtr = self.minus_infinity
+ xtr = max(xtr, xtr_root)
+ zero_zero = (0, 0)
+ if xtr_root == zero_zero:
+ xtr = self.minus_infinity
+ if not self.need_sync(path, xtl, xtr):
if path == '.':
- self.close()
+ self.sync_done([(path, xtl)], True)
return
self.xtime_reversion_hook(path, xtl, xtr)
logging.debug("entering " + path)
dem = self.master.server.entries(path)
pargfid = self.master.server.gfid(path)
if isinstance(pargfid, int):
- logging.warn('skipping directory %s' % (path))
+ logging.warn(lf('skipping directory',
+ path=path))
for e in dem:
bname = e
e = os.path.join(path, e)
- st = self.lstat(e)
+ xte = self.xtime(e)
+ if isinstance(xte, int):
+ logging.warn(lf("irregular xtime",
+ path=e,
+ error=errno.errorcode[xte]))
+ continue
+ if not self.need_sync(e, xte, xtr):
+ continue
+ st = self.master.server.lstat(e)
if isinstance(st, int):
- logging.warn('%s got purged in the interim..' % e)
+ logging.warn(lf('got purged in the interim',
+ path=e))
+ continue
+ if self.is_sticky(e, st.st_mode):
+ logging.debug(lf('ignoring sticky bit file',
+ path=e))
continue
gfid = self.master.server.gfid(e)
if isinstance(gfid, int):
- logging.warn('skipping entry %s..' % (e))
- continue
- xte = self.xtime(e)
- if isinstance(xte, int):
- raise GsyncdError('master is corrupt')
- if not self.need_sync(e, xte, xtr):
+ logging.warn(lf('skipping entry',
+ path=e))
continue
mo = st.st_mode
+ self.counter += 1 if ((stat.S_ISDIR(mo) or
+ stat.S_ISLNK(mo) or
+ stat.S_ISREG(mo))) else 0
+ if self.counter == self.XSYNC_MAX_ENTRIES:
+ self.sync_done(self.stimes, False)
+ self.stimes = []
if stat.S_ISDIR(mo):
- self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))])
- self.crawl(e, xtr)
- elif stat.S_ISREG(mo):
- self.write_entry_change("E", [gfid, 'CREATE', escape(os.path.join(pargfid, bname))])
- self.write_entry_change("D", [gfid])
+ self.write_entry_change("E",
+ [gfid, 'MKDIR', str(mo),
+ str(0), str(0), escape_space_newline(
+ os.path.join(pargfid, bname))])
+ self.write_entry_change("M", [gfid, "SETATTR", str(st.st_uid),
+ str(st.st_gid), str(st.st_mode),
+ str(st.st_atime),
+ str(st.st_mtime)])
+ self.Xcrawl(e, xtr_root)
+ stime_to_update = xte
+ # Live Changelog Start time indicates that from that time
+ # onwards Live changelogs are available. If we update stime
+ # greater than live_changelog_start time then Geo-rep will
+ # skip those changelogs as already processed. But Xsync
+ # actually failed to sync the deletes and Renames. Update
+ # stime as min(Live_changelogs_time, Actual_stime) When it
+ # switches to Changelog mode, it syncs Deletes and Renames.
+ if self.live_changelog_start_time:
+ stime_to_update = min(self.live_changelog_start_time, xte)
+ self.stimes.append((e, stime_to_update))
elif stat.S_ISLNK(mo):
- self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))])
- else:
- logging.info('ignoring %s' % e)
- if path == '.':
- logging.info('processing xsync changelog %s' % self.fname())
- self.close()
- self.process([self.fname()], done)
- self.upd_stime(xtl)
-
-class GMasterXtimeMixin(GMasterCommon):
- """ xtime based change detection and syncing """
-
- def register(self):
- pass
-
- def crawl(self, path='.', xtl=None):
- """crawling...
-
- Standing around
- All the right people
- Crawling
- Tennis on Tuesday
- The ladder is long
- It is your nature
- You've gotta suntan
- Football on Sunday
- Society boy
-
- Recursively walk the master side tree and check if updates are
- needed due to xtime differences. One invocation of crawl checks
- children of @path and do a recursive enter only on
- those directory children where there is an update needed.
-
- Way of updates depend on file type:
- - for symlinks, sync them directy and synchronously
- - for regular children, register jobs for @path (cf. .add_job) to start
- and wait on their rsync
- - for directory children, register a job for @path which waits (.wait)
- on jobs for the given child
- (other kind of filesystem nodes are not considered)
-
- Those slave side children which do not exist on master are simply
- purged (see Server.purge).
-
- Behavior is fault tolerant, synchronization is adaptive: if some action fails,
- just go on relentlessly, adding a fail job (see .add_failjob) which will prevent
- the .sendmark on @path, so when the next crawl will arrive to @path it will not
- see it as up-to-date and will try to sync it again. While this semantics can be
- supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris),
- the ultimate reason which excludes other possibilities is simply transience: we cannot
- assert that the file systems (master / slave) underneath do not change and actions
- taken upon some condition will not lose their context by the time they are performed.
- """
- logging.debug("entering " + path)
- if not xtl:
- xtl = self.xtime(path)
- if isinstance(xtl, int):
- self.add_failjob(path, 'no-local-node')
- return
- xtr = self.xtime(path, self.slave)
- if isinstance(xtr, int):
- if xtr != ENOENT:
- self.slave.server.purge(path)
- try:
- self.slave.server.mkdir(path)
- except OSError:
- self.add_failjob(path, 'no-remote-node')
- return
- xtr = self.minus_infinity
- else:
- self.xtime_reversion_hook(path, xtl, xtr)
- if xtl == xtr:
- if path == '.' and self.change_seen:
- self.turns += 1
- self.change_seen = False
- if self.total_turns:
- logging.info("finished turn #%s/%s" % \
- (self.turns, self.total_turns))
- if self.turns == self.total_turns:
- logging.info("reached turn limit")
- self.terminate = True
- return
- if path == '.':
- self.change_seen = True
- try:
- dem = self.master.server.entries(path)
- except OSError:
- self.add_failjob(path, 'local-entries-fail')
- return
- random.shuffle(dem)
- try:
- des = self.slave.server.entries(path)
- except OSError:
- self.slave.server.purge(path)
- try:
- self.slave.server.mkdir(path)
- des = self.slave.server.entries(path)
- except OSError:
- self.add_failjob(path, 'remote-entries-fail')
- return
- dd = set(des) - set(dem)
- if dd:
- self.purge_missing(path, dd)
- chld = []
- for e in dem:
- e = os.path.join(path, e)
- xte = self.xtime(e)
- if isinstance(xte, int):
- logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
- elif self.need_sync(e, xte, xtr):
- chld.append((e, xte))
- def indulgently(e, fnc, blame=None):
- if not blame:
- blame = path
- try:
- return fnc(e)
- except (IOError, OSError):
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- logging.warn("salvaged ENOENT for " + e)
- self.add_failjob(blame, 'by-indulgently')
- return False
- else:
- raise
- for e, xte in chld:
- st = indulgently(e, lambda e: os.lstat(e))
- if st == False:
- continue
-
- mo = st.st_mode
- adct = {'own': (st.st_uid, st.st_gid)}
- if stat.S_ISLNK(mo):
- if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False:
- continue
- self.sendmark(e, xte, adct)
+ self.write_entry_change(
+ "E", [gfid, 'SYMLINK', escape_space_newline(
+ os.path.join(pargfid, bname))])
elif stat.S_ISREG(mo):
- logging.debug("syncing %s ..." % e)
- pb = self.syncer.add(e)
- timeA = datetime.now()
- def regjob(e, xte, pb):
- if pb.wait()[0]:
- logging.debug("synced " + e)
- self.sendmark_regular(e, xte)
- # update stats
- timeB = datetime.now()
- self.crawl_stats['last_synctime'] = timeB - timeA
- self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.crawl_stats['files_synced'] += 1
- self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.total_crawl_stats['files_synced'] += 1
- self.update_crawl_data()
- return True
- else:
- logging.warn("failed to sync " + e)
- self.add_job(path, 'reg', regjob, e, xte, pb)
- elif stat.S_ISDIR(mo):
- adct['mode'] = mo
- if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct),
- self.crawl(e, xte),
- True)[-1], blame=e) == False:
- continue
- else:
- # ignore fifos, sockets and special files
- pass
+ nlink = st.st_nlink
+ nlink -= 1 # fixup backend stat link count
+ # if a file has a hardlink, create a Changelog entry as
+ # 'LINK' so the slave side will decide if to create the
+ # new entry, or to create link.
+ if nlink == 1:
+ self.write_entry_change("E",
+ [gfid, 'MKNOD', str(mo),
+ str(0), str(0),
+ escape_space_newline(
+ os.path.join(
+ pargfid, bname))])
+ else:
+ self.write_entry_change(
+ "E", [gfid, 'LINK', escape_space_newline(
+ os.path.join(pargfid, bname))])
+ self.write_entry_change("D", [gfid])
if path == '.':
- self.wait(path, xtl)
+ stime_to_update = xtl
+ if self.live_changelog_start_time:
+ stime_to_update = min(self.live_changelog_start_time, xtl)
+ self.stimes.append((path, stime_to_update))
+ self.sync_done(self.stimes, True)
class BoxClosedErr(Exception):
pass
+
class PostBox(list):
+
"""synchronized collection for storing things thought of as "requests" """
def __init__(self, *a):
list.__init__(self, *a)
# too bad Python stdlib does not have read/write locks...
- # it would suffivce to grab the lock in .append as reader, in .close as writer
+ # it would suffivce to grab the lock in .append as reader, in .close as
+ # writer
self.lever = Condition()
self.open = True
self.done = False
@@ -1264,7 +1931,9 @@ class PostBox(list):
self.open = False
self.lever.release()
+
class Syncer(object):
+
"""a staged queue to relay rsync requests to rsync workers
By "staged queue" its meant that when a consumer comes to the
@@ -1294,17 +1963,19 @@ class Syncer(object):
each completed syncjob.
"""
- def __init__(self, slave):
+ def __init__(self, slave, sync_engine, resilient_errnos=[]):
"""spawn worker threads"""
+ self.log_err = False
self.slave = slave
self.lock = Lock()
self.pb = PostBox()
- self.bytes_synced = 0
- for i in range(int(gconf.sync_jobs)):
- t = Thread(target=self.syncjob)
+ self.sync_engine = sync_engine
+ self.errnos_ok = resilient_errnos
+ for i in range(gconf.get("sync-jobs")):
+ t = Thread(target=self.syncjob, args=(i + 1, ))
t.start()
- def syncjob(self):
+ def syncjob(self, job_id):
"""the life of a worker"""
while True:
pb = None
@@ -1317,11 +1988,17 @@ class Syncer(object):
break
time.sleep(0.5)
pb.close()
- po = self.slave.rsync(pb)
+ start = time.time()
+ po = self.sync_engine(pb, self.log_err)
+ logging.info(lf("Sync Time Taken",
+ job=job_id,
+ num_files=len(pb),
+ return_code=po.returncode,
+ duration="%.4f" % (time.time() - start)))
+
if po.returncode == 0:
ret = (True, 0)
- elif po.returncode in (23, 24):
- # partial transfer (cf. rsync(1)), that's normal
+ elif po.returncode in self.errnos_ok:
ret = (False, po.returncode)
else:
po.errfail()
@@ -1335,3 +2012,9 @@ class Syncer(object):
return pb
except BoxClosedErr:
pass
+
+ def enable_errorlog(self):
+ self.log_err = True
+
+ def disable_errorlog(self):
+ self.log_err = False