summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCsaba Henk <csaba@gluster.com>2011-04-02 19:40:47 +0000
committerVijay Bellur <vijay@dev.gluster.com>2011-04-04 08:02:18 -0700
commitcfb9c834f96dc57c47dac8d27da4266d0dab1f3f (patch)
tree7a2ec3fb364030298e5b59f3ba6e380512fe0533
parentf007eb1a0701cd3e13e6ba67208cd1db9325a370 (diff)
syncdaemon: give some refactoring to cascading code
- expiry check of foreign volinfo moved back to GLUSTERServer, so that under the hood we can removexattr the expired ones; a nice side-effect is that we can use the same dict layout for foreign and native volinfo (ie., foreign needs no timeout field) - get_volinfo() is renamed to get_sys_volinfo() and most of the logic is stripped off of it (what remained there is the check against foreign master ambiguity) - volinfo transition logic is cut out to an almost purely functional static method (only impurity is the exeption raised upon forbidden volinfo change) - ping renamed to keep-alive, as something called "ping" is not supposed to have payload (yeah, keep-alive is a bit fishy on this front too, but could not come up with better...) Signed-off-by: Csaba Henk <csaba@gluster.com> Signed-off-by: Vijay Bellur <vijay@dev.gluster.com> BUG: 2535 (gsync cascading) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2535
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py150
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py50
2 files changed, 116 insertions, 84 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
index 28b014e12..2df1470d5 100644
--- a/xlators/features/marker/utils/syncdaemon/master.py
+++ b/xlators/features/marker/utils/syncdaemon/master.py
@@ -15,51 +15,32 @@ URXTIME = (-1, 0)
class GMaster(object):
- def get_volinfo(self):
- vol_mark_dict_list = self.master.server.foreign_marks()
- return_dict = None
- if vol_mark_dict_list:
- for i in range(0, len(vol_mark_dict_list)):
- present_time = int (time.time())
- if (present_time < vol_mark_dict_list[i]['timeout']):
- logging.debug('syncing as intermediate-master with master as %s till: %d (time)' % \
- (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout']))
- if self.inter_master:
- if (self.forgn_uuid != vol_mark_dict_list[i]['uuid']):
- raise RuntimeError ('more than one master present')
- else:
- self.inter_master = True
- self.forgn_uuid = vol_mark_dict_list[i]['uuid']
- return_dict = vol_mark_dict_list[i]
- else:
- logging.debug('an expired master (%s) with time-out: %d, present time: %d' % \
- (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout'],
- present_time))
- if self.inter_master:
- self.volume_info = return_dict
- if return_dict:
- if self.volume_info['retval']:
- raise RuntimeError ("master is corrupt")
- return self.volume_info
+ KFGN = 0
+ KNAT = 1
- self.volume_info = self.master.server.native_mark()
- logging.debug('returning volume-mark from glusterfs: %s' %(self.volume_info))
- if self.volume_info:
- if self.volume_info['retval']:
- raise RuntimeError("master is corrupt")
- return self.volume_info
+ def get_sys_volinfo(self):
+ fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \
+ self.master.server.native_volume_info()
+ fgn_vi = None
+ if fgn_vis:
+ if len(fgn_vis) > 1:
+ raise RuntimeError("cannot work with multiple foreign masters")
+ fgn_vi = fgn_vis[0]
+ return fgn_vi, nat_vi
@property
def uuid(self):
- if not getattr(self, '_uuid', None):
- if self.volume_info:
- self._uuid = self.volume_info['uuid']
- return self._uuid
+ if self.volinfo:
+ return self.volinfo['uuid']
@property
def volmark(self):
- if self.volume_info:
- return self.volume_info['volume_mark']
+ if self.volinfo:
+ return self.volinfo['volume_mark']
+
+ @property
+ def inter_master(self):
+ return self.volinfo_state[self.KFGN] and True or False
def xtime(self, path, *a, **opts):
if a:
@@ -96,28 +77,36 @@ class GMaster(object):
self.turns = 0
self.start = None
self.change_seen = None
- self.forgn_uuid = None
- self.orig_master = False
- self.inter_master = False
- self.get_volinfo()
- if self.volume_info:
- logging.info('master started on(UUID) : ' + self.uuid)
+ # the authorative (foreign, native) volinfo pair
+ # which lets us deduce what to do when we refetch
+ # the volinfos from system
+ self.volinfo_state = (None, None)
+ # the actual volinfo we make use of
+ self.volinfo = None
- #pinger
- if gconf.timeout and int(gconf.timeout) > 0:
- def pinger():
+ timo = int(gconf.timeout or 0)
+ if timo > 0:
+ def keep_alive():
while True:
- volmark = self.get_volinfo()
- if volmark:
- volmark['forgn_uuid'] = True
- timeout = int (time.time()) + 2 * gconf.timeout
- volmark['timeout'] = timeout
-
- self.slave.server.ping(volmark)
- time.sleep(int(gconf.timeout) * 0.5)
- t = threading.Thread(target=pinger)
- t.setDaemon(True)
- t.start()
+ gap = timo * 0.5
+ # first grab a reference as self.volinfo
+ # can be changed in main thread
+ vi = self.volinfo
+ if vi:
+ # then have a private copy which we can mod
+ vi = vi.copy()
+ vi['timeout'] = int(time.time()) + timo
+ else:
+ # send keep-alives more frequently to
+ # avoid a delay in announcing our volume info
+ # to slave if it becomes established in the
+ # meantime
+ gap = min(10, gap)
+ self.slave.server.keep_alive(vi)
+ time.sleep(gap)
+ t = threading.Thread(target=keep_alive)
+ t.setDaemon(True)
+ t.start()
while True:
self.crawl()
@@ -146,20 +135,53 @@ class GMaster(object):
self.slave.server.setattr(path, adct)
self.slave.server.set_xtime(path, self.uuid, mark)
+ @staticmethod
+ def volinfo_state_machine(volinfo_state, volinfo_sys):
+ # store the value below "boxed" to emulate proper closures
+ # (variables of the enclosing scope are available inner functions
+ # provided they are no reassigned; mutation is OK).
+ relax_mismatch = [False]
+ def select_vi(vi0, vi):
+ if vi and (not vi0 or vi0['uuid'] == vi['uuid']):
+ # valid new value found; for the rest, we are graceful about
+ # uuid mismatch
+ relax_mismatch[0] = True
+ return vi
+ if vi0 and vi and vi0['uuid'] != vi['uuid'] and not relax_mismatch[0]:
+ # uuid mismatch for master candidate, bail out
+ raise RuntimeError("aborting on uuid change from %s to %s" % \
+ (vi0['uuid'], vi['uuid']))
+ # fall back to old
+ return vi0
+ newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys))
+ srep = lambda vi: vi and vi['uuid'][0:8]
+ logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \
+ tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate))
+ return newstate
+
def crawl(self, path='.', xtl=None):
if path == '.':
if self.start:
logging.info("crawl took %.6f" % (time.time() - self.start))
time.sleep(1)
self.start = time.time()
- volinfo = self.get_volinfo()
- if volinfo:
- if volinfo['uuid'] != self.uuid:
- raise RuntimeError("master uuid mismatch")
+ volinfo_sys = self.get_sys_volinfo()
+ self.volinfo_state = self.volinfo_state_machine(self.volinfo_state, volinfo_sys)
+ if self.inter_master:
+ self.volinfo = volinfo_sys[self.KFGN]
+ else:
+ self.volinfo = volinfo_sys[self.KNAT]
+ if self.volinfo:
+ if self.volinfo['retval']:
+ raise RuntimeError ("master is corrupt")
logging.info("Crawling as %s (%s master mode) ..." % \
- (self.uuid,self.inter_master and "intermediate" or "primary"))
+ (self.uuid, self.inter_master and "intermediate" or "primary"))
else:
- logging.info("Crawling: waiting for valid key for %s" % self.uuid)
+ if self.inter_master:
+ logging.info("Crawling: waiting for being synced from %s" % \
+ self.volinfo_state[self.KFGN]['uuid'])
+ else:
+ logging.info("Crawling: waiting for volume info")
return
logging.debug("entering " + path)
if not xtl:
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
index bebe5c22b..7083b56cf 100644
--- a/xlators/features/marker/utils/syncdaemon/resource.py
+++ b/xlators/features/marker/utils/syncdaemon/resource.py
@@ -99,6 +99,12 @@ class Xattr(object):
cls.raise_oserr()
@classmethod
+ def lremovexattr(cls, path, attr):
+ ret = cls.libc.lremovexattr(path, attr)
+ if ret == -1:
+ cls.raise_oserr()
+
+ @classmethod
def llistxattr_buf(cls, path):
size = cls.llistxattr(path)
if size == -1:
@@ -106,7 +112,6 @@ class Xattr(object):
return cls.llistxattr(path, size)
-
class Server(object):
GX_NSPACE = "trusted.glusterfs"
@@ -205,9 +210,9 @@ class Server(object):
def pid():
return os.getpid()
- lastping = 0
+ last_keep_alive = 0
@classmethod
- def ping(cls, dct):
+ def keep_alive(cls, dct):
if dct:
key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
val = struct.pack(cls.FRGN_FMTSTR,
@@ -217,8 +222,8 @@ class Server(object):
Xattr.lsetxattr('.', key, val)
else:
logging.info('no volume-mark, if the behaviour persists have to check if master gsyncd is running')
- cls.lastping += 1
- return cls.lastping
+ cls.last_keep_alive += 1
+ return cls.last_keep_alive
@staticmethod
def version():
@@ -238,9 +243,9 @@ class SlaveLocal(object):
logging.info("slave listening")
if gconf.timeout and int(gconf.timeout) > 0:
while True:
- lp = self.server.lastping
+ lp = self.server.last_keep_alive
time.sleep(int(gconf.timeout))
- if lp == self.server.lastping:
+ if lp == self.server.last_keep_alive:
logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))
break
else:
@@ -339,7 +344,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
class GLUSTERServer(Server):
@classmethod
- def attr_unpack_dict(cls, xattr, extra_fields = ''):
+ def _attr_unpack_dict(cls, xattr, extra_fields = ''):
fmt_string = cls.NTV_FMTSTR + extra_fields
buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
vm = struct.unpack(fmt_string, buf)
@@ -356,27 +361,32 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
return volinfo
@classmethod
- def foreign_marks(cls):
+ def foreign_volume_infos(cls):
dict_list = []
xattr_list = Xattr.llistxattr_buf('.')
for ele in xattr_list:
- if ele.find('trusted.glusterfs.volume-mark.') == 0:
- d, x = cls.attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
- d['timeout'] = x[0]
- dict_list.append(d)
+ if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0:
+ d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
+ now = int(time.time())
+ if x[0] > now:
+ logging.debug("volinfo[%s] expires: %d (%d sec later)" % \
+ (d['uuid'], x[0], x[0] - now))
+ dict_list.append(d)
+ else:
+ try:
+ Xattr.lremovexattr('.', ele)
+ except OSError:
+ pass
return dict_list
@classmethod
- def native_mark(cls):
+ def native_volume_info(cls):
try:
- return cls.attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))
+ return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))
except OSError:
ex = sys.exc_info()[1]
- if ex.errno == ENODATA:
- logging.warn("volume-mark not found")
- return
- else:
- raise RuntimeError("master is corrupt")
+ if ex.errno != ENODATA:
+ raise
server = GLUSTERServer