From cfb9c834f96dc57c47dac8d27da4266d0dab1f3f Mon Sep 17 00:00:00 2001 From: Csaba Henk Date: Sat, 2 Apr 2011 19:40:47 +0000 Subject: syncdaemon: give some refactoring to cascading code - expiry check of foreign volinfo moved back to GLUSTERServer, so that under the hood we can removexattr the expired ones; a nice side-effect is that we can use the same dict layout for foreign and native volinfo (ie., foreign needs no timeout field) - get_volinfo() is renamed to get_sys_volinfo() and most of the logic is stripped off of it (what remained there is the check against foreign master ambiguity) - volinfo transition logic is cut out to an almost purely functional static method (only impurity is the exeption raised upon forbidden volinfo change) - ping renamed to keep-alive, as something called "ping" is not supposed to have payload (yeah, keep-alive is a bit fishy on this front too, but could not come up with better...) Signed-off-by: Csaba Henk Signed-off-by: Vijay Bellur BUG: 2535 (gsync cascading) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2535 --- xlators/features/marker/utils/syncdaemon/master.py | 150 ++++++++++++--------- .../features/marker/utils/syncdaemon/resource.py | 50 ++++--- 2 files changed, 116 insertions(+), 84 deletions(-) diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index 28b014e121a..2df1470d5f7 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -15,51 +15,32 @@ URXTIME = (-1, 0) class GMaster(object): - def get_volinfo(self): - vol_mark_dict_list = self.master.server.foreign_marks() - return_dict = None - if vol_mark_dict_list: - for i in range(0, len(vol_mark_dict_list)): - present_time = int (time.time()) - if (present_time < vol_mark_dict_list[i]['timeout']): - logging.debug('syncing as intermediate-master with master as %s till: %d (time)' % \ - (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout'])) - if self.inter_master: - if (self.forgn_uuid != vol_mark_dict_list[i]['uuid']): - raise RuntimeError ('more than one master present') - else: - self.inter_master = True - self.forgn_uuid = vol_mark_dict_list[i]['uuid'] - return_dict = vol_mark_dict_list[i] - else: - logging.debug('an expired master (%s) with time-out: %d, present time: %d' % \ - (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout'], - present_time)) - if self.inter_master: - self.volume_info = return_dict - if return_dict: - if self.volume_info['retval']: - raise RuntimeError ("master is corrupt") - return self.volume_info + KFGN = 0 + KNAT = 1 - self.volume_info = self.master.server.native_mark() - logging.debug('returning volume-mark from glusterfs: %s' %(self.volume_info)) - if self.volume_info: - if self.volume_info['retval']: - raise RuntimeError("master is corrupt") - return self.volume_info + def get_sys_volinfo(self): + fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ + self.master.server.native_volume_info() + fgn_vi = None + if fgn_vis: + if len(fgn_vis) > 1: + raise RuntimeError("cannot work with multiple foreign masters") + fgn_vi = fgn_vis[0] + return fgn_vi, nat_vi @property def uuid(self): - if not getattr(self, '_uuid', None): - if self.volume_info: - self._uuid = self.volume_info['uuid'] - return self._uuid + if self.volinfo: + return self.volinfo['uuid'] @property def volmark(self): - if self.volume_info: - return self.volume_info['volume_mark'] + if self.volinfo: + return self.volinfo['volume_mark'] + + @property + def inter_master(self): + return self.volinfo_state[self.KFGN] and True or False def xtime(self, path, *a, **opts): if a: @@ -96,28 +77,36 @@ class GMaster(object): self.turns = 0 self.start = None self.change_seen = None - self.forgn_uuid = None - self.orig_master = False - self.inter_master = False - self.get_volinfo() - if self.volume_info: - logging.info('master started on(UUID) : ' + self.uuid) + # the authorative (foreign, native) volinfo pair + # which lets us deduce what to do when we refetch + # the volinfos from system + self.volinfo_state = (None, None) + # the actual volinfo we make use of + self.volinfo = None - #pinger - if gconf.timeout and int(gconf.timeout) > 0: - def pinger(): + timo = int(gconf.timeout or 0) + if timo > 0: + def keep_alive(): while True: - volmark = self.get_volinfo() - if volmark: - volmark['forgn_uuid'] = True - timeout = int (time.time()) + 2 * gconf.timeout - volmark['timeout'] = timeout - - self.slave.server.ping(volmark) - time.sleep(int(gconf.timeout) * 0.5) - t = threading.Thread(target=pinger) - t.setDaemon(True) - t.start() + gap = timo * 0.5 + # first grab a reference as self.volinfo + # can be changed in main thread + vi = self.volinfo + if vi: + # then have a private copy which we can mod + vi = vi.copy() + vi['timeout'] = int(time.time()) + timo + else: + # send keep-alives more frequently to + # avoid a delay in announcing our volume info + # to slave if it becomes established in the + # meantime + gap = min(10, gap) + self.slave.server.keep_alive(vi) + time.sleep(gap) + t = threading.Thread(target=keep_alive) + t.setDaemon(True) + t.start() while True: self.crawl() @@ -146,20 +135,53 @@ class GMaster(object): self.slave.server.setattr(path, adct) self.slave.server.set_xtime(path, self.uuid, mark) + @staticmethod + def volinfo_state_machine(volinfo_state, volinfo_sys): + # store the value below "boxed" to emulate proper closures + # (variables of the enclosing scope are available inner functions + # provided they are no reassigned; mutation is OK). + relax_mismatch = [False] + def select_vi(vi0, vi): + if vi and (not vi0 or vi0['uuid'] == vi['uuid']): + # valid new value found; for the rest, we are graceful about + # uuid mismatch + relax_mismatch[0] = True + return vi + if vi0 and vi and vi0['uuid'] != vi['uuid'] and not relax_mismatch[0]: + # uuid mismatch for master candidate, bail out + raise RuntimeError("aborting on uuid change from %s to %s" % \ + (vi0['uuid'], vi['uuid'])) + # fall back to old + return vi0 + newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys)) + srep = lambda vi: vi and vi['uuid'][0:8] + logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \ + tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate)) + return newstate + def crawl(self, path='.', xtl=None): if path == '.': if self.start: logging.info("crawl took %.6f" % (time.time() - self.start)) time.sleep(1) self.start = time.time() - volinfo = self.get_volinfo() - if volinfo: - if volinfo['uuid'] != self.uuid: - raise RuntimeError("master uuid mismatch") + volinfo_sys = self.get_sys_volinfo() + self.volinfo_state = self.volinfo_state_machine(self.volinfo_state, volinfo_sys) + if self.inter_master: + self.volinfo = volinfo_sys[self.KFGN] + else: + self.volinfo = volinfo_sys[self.KNAT] + if self.volinfo: + if self.volinfo['retval']: + raise RuntimeError ("master is corrupt") logging.info("Crawling as %s (%s master mode) ..." % \ - (self.uuid,self.inter_master and "intermediate" or "primary")) + (self.uuid, self.inter_master and "intermediate" or "primary")) else: - logging.info("Crawling: waiting for valid key for %s" % self.uuid) + if self.inter_master: + logging.info("Crawling: waiting for being synced from %s" % \ + self.volinfo_state[self.KFGN]['uuid']) + else: + logging.info("Crawling: waiting for volume info") return logging.debug("entering " + path) if not xtl: diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py index bebe5c22b92..7083b56cff4 100644 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -98,6 +98,12 @@ class Xattr(object): if ret == -1: cls.raise_oserr() + @classmethod + def lremovexattr(cls, path, attr): + ret = cls.libc.lremovexattr(path, attr) + if ret == -1: + cls.raise_oserr() + @classmethod def llistxattr_buf(cls, path): size = cls.llistxattr(path) @@ -106,7 +112,6 @@ class Xattr(object): return cls.llistxattr(path, size) - class Server(object): GX_NSPACE = "trusted.glusterfs" @@ -205,9 +210,9 @@ class Server(object): def pid(): return os.getpid() - lastping = 0 + last_keep_alive = 0 @classmethod - def ping(cls, dct): + def keep_alive(cls, dct): if dct: key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) val = struct.pack(cls.FRGN_FMTSTR, @@ -217,8 +222,8 @@ class Server(object): Xattr.lsetxattr('.', key, val) else: logging.info('no volume-mark, if the behaviour persists have to check if master gsyncd is running') - cls.lastping += 1 - return cls.lastping + cls.last_keep_alive += 1 + return cls.last_keep_alive @staticmethod def version(): @@ -238,9 +243,9 @@ class SlaveLocal(object): logging.info("slave listening") if gconf.timeout and int(gconf.timeout) > 0: while True: - lp = self.server.lastping + lp = self.server.last_keep_alive time.sleep(int(gconf.timeout)) - if lp == self.server.lastping: + if lp == self.server.last_keep_alive: logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) break else: @@ -339,7 +344,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): class GLUSTERServer(Server): @classmethod - def attr_unpack_dict(cls, xattr, extra_fields = ''): + def _attr_unpack_dict(cls, xattr, extra_fields = ''): fmt_string = cls.NTV_FMTSTR + extra_fields buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) vm = struct.unpack(fmt_string, buf) @@ -356,27 +361,32 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): return volinfo @classmethod - def foreign_marks(cls): + def foreign_volume_infos(cls): dict_list = [] xattr_list = Xattr.llistxattr_buf('.') for ele in xattr_list: - if ele.find('trusted.glusterfs.volume-mark.') == 0: - d, x = cls.attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) - d['timeout'] = x[0] - dict_list.append(d) + if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: + d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) + now = int(time.time()) + if x[0] > now: + logging.debug("volinfo[%s] expires: %d (%d sec later)" % \ + (d['uuid'], x[0], x[0] - now)) + dict_list.append(d) + else: + try: + Xattr.lremovexattr('.', ele) + except OSError: + pass return dict_list @classmethod - def native_mark(cls): + def native_volume_info(cls): try: - return cls.attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) + return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) except OSError: ex = sys.exc_info()[1] - if ex.errno == ENODATA: - logging.warn("volume-mark not found") - return - else: - raise RuntimeError("master is corrupt") + if ex.errno != ENODATA: + raise server = GLUSTERServer -- cgit