diff options
| author | Kaushik BV <kaushikbv@gluster.com> | 2011-03-29 09:10:07 +0000 | 
|---|---|---|
| committer | Vijay Bellur <vijay@dev.gluster.com> | 2011-03-29 08:46:20 -0700 | 
| commit | 4597929cc527f8abaf9ef9e1d5499ea416e5c7ff (patch) | |
| tree | 29f96ebe322d250188e4c3692201a2d574dd70ed | |
| parent | 4c246c02f4ab569fca92255b7efb819243711d6b (diff) | |
Gsyncd: Cascading of gsync daemons
This patch allows the slave of a gsyncd to be started as the master of another slave gsyncd.
Signed-off-by: Kaushik BV <kaushikbv@gluster.com>
Signed-off-by: Vijay Bellur <vijay@dev.gluster.com>
BUG: 2535 (gsync cascading)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2535
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 94 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/resource.py | 95 | 
2 files changed, 155 insertions, 34 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index a275f55fbe6..dfa7a2e6f56 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -1,5 +1,6 @@  import os  import sys +import threading  import time  import stat  import signal @@ -15,20 +16,50 @@ URXTIME = (-1, 0)  class GMaster(object):      def get_volinfo(self): -        self.volume_info = self.master.server.volume_info() -        if self.volume_info['retval']: -            raise RuntimeError("master is corrupt") -        return self.volume_info +        vol_mark_dict_list = self.master.server.foreign_marks() +        return_dict = None +        if vol_mark_dict_list: +            for i in range(0, len(vol_mark_dict_list)): +                present_time = int (time.time()) +                if (present_time < vol_mark_dict_list[i]['timeout']): +                    logging.debug('syncing as intermediate-master with master as %s till: %d (time)' % \ +                                  (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout'])) +                    if self.inter_master: +                        if (self.forgn_uuid != vol_mark_dict_list[i]['uuid']): +                            raise RuntimeError ('more than one master present') +                    else: +                        self.inter_master = True +                        self.forgn_uuid = vol_mark_dict_list[i]['uuid'] +                    return_dict = vol_mark_dict_list[i] +                else: +                    logging.debug('an expired master (%s) with time-out: %d, present time: %d' % \ +                                  (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout'], +                                    present_time)) +        if self.inter_master: +            self.volume_info = return_dict +            if return_dict: +                if self.volume_info['retval']: +                    raise RuntimeError ("master is corrupt") +            return self.volume_info + +        self.volume_info =  self.master.server.native_mark() +        logging.debug('returning volume-mark from glusterfs: %s' %(self.volume_info)) +        if self.volume_info: +            if self.volume_info['retval']: +                raise RuntimeError("master is corrupt") +            return self.volume_info      @property      def uuid(self):          if not getattr(self, '_uuid', None): -            self._uuid = self.volume_info['uuid'] +            if self.volume_info: +                self._uuid = self.volume_info['uuid']          return self._uuid      @property      def volmark(self): -        return self.volume_info['volume_mark'] +        if self.volume_info: +            return self.volume_info['volume_mark']      def xtime(self, path, *a, **opts):          if a: @@ -36,31 +67,57 @@ class GMaster(object):          else:              rsc = self.master          if not 'create' in opts: -            opts['create'] = rsc == self.master +            opts['create'] = (rsc == self.master and not self.inter_master) +        if not 'default_xtime' in opts: +            if self.inter_master: +                opts['default_xtime'] = ENODATA +            else: +                opts['default_xtime'] = URXTIME          xt = rsc.server.xtime(path, self.uuid)          if isinstance(xt, int) and xt != ENODATA:              return xt -        if (xt == ENODATA or xt < self.volmark) and opts['create']: +        invalid_xtime = (xt == ENODATA or xt < self.volmark) +        if invalid_xtime and opts['create']:              t = time.time()              sec = int(t)              nsec = int((t - sec) * 1000000)              xt = (sec, nsec)              rsc.server.set_xtime(path, self.uuid, xt) -        if xt == ENODATA: -            xt = URXTIME +        if invalid_xtime: +            xt = opts['default_xtime']          return xt      def __init__(self, master, slave):          self.master = master          self.slave = slave -        self.get_volinfo()          self.jobtab = {}          self.syncer = Syncer(slave)          self.total_turns = int(gconf.turns)          self.turns = 0          self.start = None          self.change_seen = None -        logging.info('master started on ' + self.uuid) +        self.forgn_uuid = None +        self.orig_master = False +        self.inter_master = False +        self.get_volinfo() +        if self.volume_info: +            logging.info('master started on(UUID) : ' + self.uuid) + +        #pinger +        if gconf.timeout and int(gconf.timeout) > 0: +            def pinger(): +                while True: +                    volmark = self.get_volinfo() +                    if volmark: +                        volmark['forgn_uuid'] = True +                        timeout = int (time.time()) + 2 * gconf.timeout +                        volmark['timeout'] = timeout + +                    self.slave.server.ping(volmark) +                    time.sleep(int(gconf.timeout) * 0.5) +        t = threading.Thread(target=pinger) +        t.setDaemon(True) +        t.start()          while True:              self.crawl() @@ -95,10 +152,15 @@ class GMaster(object):                  logging.info("crawl took %.6f" % (time.time() - self.start))              time.sleep(1)              self.start = time.time() -            logging.info("crawling...") -            self.get_volinfo() -            if self.volume_info['uuid'] != self.uuid: -                raise RuntimeError("master uuid mismatch") +            volinfo = self.get_volinfo() +            if volinfo: +                if volinfo['uuid'] != self.uuid: +                    raise RuntimeError("master uuid mismatch") +                logging.info("Crawling as %s (%s master mode) ..." % \ +                             (self.uuid,self.inter_master and "intermediate" or "primary")) +            else: +                logging.info("Crawling: waiting for valid key for %s" % self.uuid) +                return          logging.debug("entering " + path)          if not xtl:              xtl = self.xtime(path) diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py index ad0d98bad13..8556e4246f7 100644 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -10,6 +10,7 @@ import socket  import logging  import tempfile  import threading +import time  from ctypes import *  from ctypes.util import find_library  from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR @@ -67,12 +68,12 @@ class Xattr(object):          raise OSError(errn, os.strerror(errn))      @classmethod -    def lgetxattr(cls, path, attr, siz=0): +    def _query_xattr(cls, path, siz, syscall, *a):          if siz:              buf = create_string_buffer('\0' * siz)          else:              buf = None -        ret = cls.libc.lgetxattr(path, attr, buf, siz) +        ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz)))          if ret == -1:              cls.raise_oserr()          if siz: @@ -81,15 +82,37 @@ class Xattr(object):              return ret      @classmethod +    def lgetxattr(cls, path, attr, siz=0): +        return cls._query_xattr( path, siz, 'lgetxattr', attr) + +    @classmethod +    def llistxattr(cls, path, siz=0): +        ret = cls._query_xattr(path, siz, 'llistxattr') +        if isinstance(ret, str): +            ret = ret.split('\0') +        return ret + +    @classmethod      def lsetxattr(cls, path, attr, val):          ret = cls.libc.lsetxattr(path, attr, val, len(val), 0)          if ret == -1:              cls.raise_oserr() +    @classmethod +    def llistxattr_buf(cls, path): +        size = cls.llistxattr(path) +        if size  == -1: +            raise_oserr() +        return cls.llistxattr(path, size) + +  class Server(object):      GX_NSPACE = "trusted.glusterfs" +    NTV_FMTSTR = "!" + "B"*19 + "II" +    FRGN_XTRA_FMT = "I" +    FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT      @staticmethod      def entries(path): @@ -184,7 +207,16 @@ class Server(object):      lastping = 0      @classmethod -    def ping(cls): +    def ping(cls, dct): +        if dct: +            key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) +            val = struct.pack(cls.FRGN_FMTSTR, +                              *(dct['version']  + +                                tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) + +                                (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],))) +            Xattr.lsetxattr('.', key, val) +        else: +            logging.info('no volume-mark, if the behaviour persists have to check if master gsyncd is running')          cls.lastping += 1          return cls.lastping @@ -243,14 +275,6 @@ class SlaveRemote(object):                  da1[i][k] = int(v)          if da1[0] != da1[1]:              raise RuntimeError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) -        if gconf.timeout and int(gconf.timeout) > 0: -            def pinger(): -                while True: -                    self.server.ping() -                    time.sleep(int(gconf.timeout) * 0.5) -            t = threading.Thread(target=pinger) -            t.setDaemon(True) -            t.start()      def rsync(self, files, *args):          if not files: @@ -314,16 +338,51 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      class GLUSTERServer(Server): +        forgn_mark_size = struct.calcsize(Server.FRGN_FMTSTR) +        nativ_mark_size = struct.calcsize(Server.NTV_FMTSTR) +          @classmethod -        def volume_info(cls): -            vm = struct.unpack('!' + 'B'*19 + 'II', -                               Xattr.lgetxattr('.', '.'.join([cls.GX_NSPACE, 'volume-mark']), 27)) +        def attr_unpack_dict(cls, xattr, extra_fields = ''): +            fmt_string = cls.NTV_FMTSTR + extra_fields +            buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) +            vm = struct.unpack(fmt_string, buf) +            logging.info("str: %s" % `vm`)              m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]]))              uuid = '-'.join(m.groups()) -            return { 'version': vm[0:2], -                     'uuid'   : uuid, -                     'retval' : vm[18], -                     'volume_mark': vm[-2:] } +            volinfo = {  'version': vm[0:2], +                         'uuid'   : uuid, +                         'retval' : vm[18], +                         'volume_mark': vm[18:20], +                      } +            logging.info("volinfo: %s" % `volinfo`) +            if extra_fields: +                return volinfo, vm[-len(extra_fields):] +            else: +                return volinfo + +        @classmethod +        def foreign_marks(cls): +            dict_list = [] +            xattr_list = Xattr.llistxattr_buf('.') +            for ele in xattr_list: +                if (ele.find('trusted.glusterfs.volume-mark') != -1): +                    #buf = Xattr.lgetxattr('.', ele, cls.forgn_mark_size) +                    d, x = cls.attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) +                    d['timeout'] = x[0] +                    dict_list.append(d) +            return dict_list + +        @classmethod +        def native_mark(cls): +            try: +                return cls.attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) +            except OSError: +                ex = sys.exc_info()[1] +                if ex.errno == ENODATA: +                    logging.warn("volume-mark not found") +                    return +                else: +                    raise RuntimeError("master is corrupt")      server = GLUSTERServer  | 
