diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 799 | 
1 files changed, 573 insertions, 226 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 95810a61ee1..721fe18bd18 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -10,15 +10,16 @@ import socket  import string  import errno  from shutil import copyfileobj -from errno import ENOENT, ENODATA, EPIPE, EEXIST +from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode  from threading import currentThread, Condition, Lock  from datetime import datetime +from libcxattr import Xattr  from gconf import gconf  from tempfile import mkdtemp, NamedTemporaryFile  from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \                         unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ -                       lstat, errno_wrap +                       lstat, errno_wrap, update_file  URXTIME = (-1, 0) @@ -59,7 +60,8 @@ def gmaster_builder(excrawl=None):      crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')      sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin      purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin -    class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin): +    syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine +    class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine):          pass      return _GMaster @@ -101,14 +103,17 @@ class NormalMixin(object):          if not 'default_xtime' in opts:              opts['default_xtime'] = URXTIME -    def xtime_low(self, server, path, **opts): -        xt = server.xtime(path, self.uuid) +    def xtime_low(self, rsc, path, **opts): +        if rsc == self.master: +            xt = rsc.server.xtime(path, self.uuid) +        else: +            xt = rsc.server.stime(path, self.uuid)          if isinstance(xt, int) and xt != ENODATA:              return xt          if xt == ENODATA or xt < self.volmark:              if opts['create']:                  xt = _xtime_now() -                server.aggregated.set_xtime(path, self.uuid, xt) +                rsc.server.aggregated.set_xtime(path, self.uuid, xt)              else:                  xt = opts['default_xtime']          return xt @@ -140,7 +145,7 @@ class NormalMixin(object):          return xte > xtrd      def set_slave_xtime(self, path, mark): -        self.slave.server.set_xtime(path, self.uuid, mark) +        self.slave.server.set_stime(path, self.uuid, mark)          self.slave.server.set_xtime_remote(path, self.uuid, mark)  class PartialMixin(NormalMixin): @@ -190,6 +195,65 @@ class PurgeNoopMixin(object):      def purge_missing(self, path, names):          pass +class TarSSHEngine(object): +    """Sync engine that uses tar(1) piped over ssh(1) +       for data transfers. Good for lots of small files. +    """ +    def a_syncdata(self, files): +        logging.debug('files: %s' % (files)) +        for f in files: +            pb = self.syncer.add(f) +            def regjob(se, xte, pb): +                rv = pb.wait() +                if rv[0]: +                    logging.debug('synced ' + se) +                    return True +                else: +                    # stat check for file presence +                    st = lstat(se) +                    if isinstance(st, int): +                        return True +                    logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1])) +            self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) + +    def syncdata_wait(self): +        if self.wait(self.FLAT_DIR_HIERARCHY, None): +            return True + +    def syncdata(self, files): +        self.a_syncdata(files) +        self.syncdata_wait() + +class RsyncEngine(object): +    """Sync engine that uses rsync(1) for data transfers""" +    def a_syncdata(self, files): +        logging.debug('files: %s' % (files)) +        for f in files: +            logging.debug('candidate for syncing %s' % f) +            pb = self.syncer.add(f) +            def regjob(se, xte, pb): +                rv = pb.wait() +                if rv[0]: +                    logging.debug('synced ' + se) +                    return True +                else: +                    if rv[1] in [23, 24]: +                        # stat to check if the file exist +                        st = lstat(se) +                        if isinstance(st, int): +                            # file got unlinked in the interim +                            return True +                    logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) +            self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) + +    def syncdata_wait(self): +        if self.wait(self.FLAT_DIR_HIERARCHY, None): +            return True + +    def syncdata(self, files): +        self.a_syncdata(files) +        self.syncdata_wait() +  class GMasterCommon(object):      """abstract class impementling master role""" @@ -234,7 +298,7 @@ class GMasterCommon(object):          else:              rsc = self.master          self.make_xtime_opts(rsc == self.master, opts) -        return self.xtime_low(rsc.server, path, **opts) +        return self.xtime_low(rsc, path, **opts)      def get_initial_crawl_data(self):          # while persisting only 'files_syncd' is non-zero, rest of @@ -243,18 +307,26 @@ class GMasterCommon(object):          default_data = {'files_syncd': 0,                          'files_remaining': 0,                          'bytes_remaining': 0, -                        'purges_remaining': 0} +                        'purges_remaining': 0, +                        'total_files_skipped': 0}          if getattr(gconf, 'state_detail_file', None):              try: -                return json.load(open(gconf.state_detail_file)) -            except (IOError, OSError): +                with open(gconf.state_detail_file, 'r+') as f: +                    loaded_data= json.load(f) +                    diff_data = set(default_data) - set (loaded_data) +                    if len(diff_data): +                        for i in diff_data: +                            loaded_data[i] = default_data[i] +                    return loaded_data +            except (IOError):                  ex = sys.exc_info()[1] -                if ex.errno == ENOENT: -                    # Create file with initial data +                logging.warn ('Creating new gconf.state_detail_file.') +                # Create file with initial data +                try:                      with open(gconf.state_detail_file, 'wb') as f:                          json.dump(default_data, f)                      return default_data -                else: +                except:                      raise          return default_data @@ -264,6 +336,8 @@ class GMasterCommon(object):                  same_dir = os.path.dirname(gconf.state_detail_file)                  with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:                      json.dump(self.total_crawl_stats, tmp) +                    tmp.flush() +                    os.fsync(tmp.fileno())                      os.rename(tmp.name, gconf.state_detail_file)              except (IOError, OSError):                  raise @@ -272,7 +346,13 @@ class GMasterCommon(object):          self.master = master          self.slave = slave          self.jobtab = {} -        self.syncer = Syncer(slave) +        if boolify(gconf.use_tarssh): +            logging.info("using 'tar over ssh' as the sync engine") +            self.syncer = Syncer(slave, self.slave.tarssh) +        else: +            logging.info("using 'rsync' as the sync engine") +            # partial transfer (cf. rsync(1)), that's normal +            self.syncer = Syncer(slave, self.slave.rsync, [23, 24])          # crawls vs. turns:          # - self.crawls is simply the number of crawl() invocations on root          # - one turn is a maximal consecutive sequence of crawls so that each @@ -294,6 +374,8 @@ class GMasterCommon(object):          self.terminate = False          self.sleep_interval = 1          self.checkpoint_thread = None +        self.current_files_skipped_count = 0 +        self.skipped_gfid_list = []      def init_keep_alive(cls):          """start the keep-alive thread """ @@ -336,7 +418,8 @@ class GMasterCommon(object):          gconf.configinterface.set('volume_id', self.uuid)          if self.volinfo:              if self.volinfo['retval']: -                raise GsyncdError("master is corrupt") +                logging.warn("master cluster's info may not be valid %d" % \ +                             self.volinfo['retval'])              self.start_checkpoint_thread()          else:              raise GsyncdError("master volinfo unavailable") @@ -349,7 +432,7 @@ class GMasterCommon(object):          while not self.terminate:              if self.start:                  logging.debug("... crawl #%d done, took %.6f seconds" % \ -                                  (self.crawls, time.time() - self.start)) +                              (self.crawls, time.time() - self.start))              self.start = time.time()              should_display_info = self.start - self.lastreport['time'] >= 60              if should_display_info: @@ -363,9 +446,20 @@ class GMasterCommon(object):              if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds                  crawl = self.should_crawl()                  t0 = t1 +            self.update_worker_remote_node()              if not crawl: +                self.update_worker_health("Passive") +                # bring up _this_ brick to the cluster stime +                # which is min of cluster (but max of the replicas) +                brick_stime = self.xtime('.', self.slave) +                cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)])) +                logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime))) +                if not isinstance(cluster_stime, int): +                    if brick_stime < cluster_stime: +                        self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)                  time.sleep(5)                  continue +            self.update_worker_health("Active")              self.crawl()              if oneshot:                  return @@ -375,7 +469,7 @@ class GMasterCommon(object):      def _checkpt_param(cls, chkpt, prm, xtimish=True):          """use config backend to lookup a parameter belonging to             checkpoint @chkpt""" -        cprm = getattr(gconf, 'checkpoint_' + prm, None) +        cprm = gconf.configinterface.get_realtime('checkpoint_' + prm)          if not cprm:              return          chkpt_mapped, val = cprm.split(':', 1) @@ -402,17 +496,6 @@ class GMasterCommon(object):              ts += '.' + str(tpair[1])          return ts -    def get_extra_info(self): -        str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \ -            (self._crawl_time_format(datetime.now() - self.crawl_start), \ -                 self.total_crawl_stats['files_syncd'], \ -                 self.total_crawl_stats['files_remaining'], \ -                 self.total_crawl_stats['bytes_remaining'], \ -                 self.total_crawl_stats['purges_remaining']) -        str_info += '\0' -        logging.debug(str_info) -        return str_info -      def _crawl_time_format(self, crawl_time):          # Ex: 5 years, 4 days, 20:23:10          years, days = divmod(crawl_time.days, 365.25) @@ -431,27 +514,49 @@ class GMasterCommon(object):          date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2))          return date -    def checkpt_service(self, chan, chkpt, tgt): +    def checkpt_service(self, chan, chkpt):          """checkpoint service loop          monitor and verify checkpoint status for @chkpt, and listen          for incoming requests for whom we serve a pretty-formatted          status report""" -        if not chkpt: -            # dummy loop for the case when there is no checkpt set -            while True: +        while True: +            chkpt = gconf.configinterface.get_realtime("checkpoint") +            if not chkpt: +                gconf.configinterface.delete("checkpoint_completed") +                gconf.configinterface.delete("checkpoint_target") +                # dummy loop for the case when there is no checkpt set                  select([chan], [], [])                  conn, _ = chan.accept() -                conn.send(self.get_extra_info()) +                conn.send('\0')                  conn.close() -        completed = self._checkpt_param(chkpt, 'completed', xtimish=False) -        if completed: -            completed = tuple(int(x) for x in completed.split('.')) -        while True: +                continue + +            checkpt_tgt = self._checkpt_param(chkpt, 'target') +            if not checkpt_tgt: +                checkpt_tgt = self.xtime('.') +                if isinstance(checkpt_tgt, int): +                    raise GsyncdError("master root directory is unaccessible (%s)", +                                      os.strerror(checkpt_tgt)) +                self._set_checkpt_param(chkpt, 'target', checkpt_tgt) +            logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ +                          (repr(checkpt_tgt), chkpt)) + +            # check if the label is 'now' +            chkpt_lbl = chkpt +            try: +                x1,x2 = chkpt.split(':') +                if x1 == 'now': +                    chkpt_lbl = "as of " + self.humantime(x2) +            except: +                pass +            completed = self._checkpt_param(chkpt, 'completed', xtimish=False) +            if completed: +                completed = tuple(int(x) for x in completed.split('.'))              s,_,_ = select([chan], [], [], (not completed) and 5 or None)              # either request made and we re-check to not              # give back stale data, or we still hunting for completion -            if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark: +            if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark:                  # indexing has been reset since setting the checkpoint                  status = "is invalid"              else: @@ -459,12 +564,12 @@ class GMasterCommon(object):                  if isinstance(xtr, int):                      raise GsyncdError("slave root directory is unaccessible (%s)",                                        os.strerror(xtr)) -                ncompleted = self.xtime_geq(xtr, tgt) +                ncompleted = self.xtime_geq(xtr, checkpt_tgt)                  if completed and not ncompleted: # stale data                      logging.warn("completion time %s for checkpoint %s became stale" % \                                   (self.humantime(*completed), chkpt))                      completed = None -                    gconf.confdata.delete('checkpoint-completed') +                    gconf.configinterface.delete('checkpoint_completed')                  if ncompleted and not completed: # just reaching completion                      completed = "%.6f" % time.time()                      self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) @@ -478,7 +583,7 @@ class GMasterCommon(object):                  try:                      conn, _ = chan.accept()                      try: -                        conn.send("  | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info())) +                        conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status))                      except:                          exc = sys.exc_info()[1]                          if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ @@ -505,18 +610,8 @@ class GMasterCommon(object):                  pass          chan.bind(state_socket)          chan.listen(1) -        checkpt_tgt = None -        if gconf.checkpoint: -            checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') -            if not checkpt_tgt: -                checkpt_tgt = self.xtime('.') -                if isinstance(checkpt_tgt, int): -                    raise GsyncdError("master root directory is unaccessible (%s)", -                                      os.strerror(checkpt_tgt)) -                self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) -            logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ -                          (repr(checkpt_tgt), gconf.checkpoint)) -        t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) +        chkpt = gconf.configinterface.get_realtime("checkpoint") +        t = Thread(target=self.checkpt_service, args=(chan, chkpt))          t.start()          self.checkpoint_thread = t @@ -567,15 +662,11 @@ class GMasterChangelogMixin(GMasterCommon):      POS_GFID   = 0      POS_TYPE   = 1 -    POS_ENTRY1 = 2 -    POS_ENTRY2 = 3  # renames - -    _CL_TYPE_DATA_PFX     = "D " -    _CL_TYPE_METADATA_PFX = "M " -    _CL_TYPE_ENTRY_PFX    = "E " +    POS_ENTRY1 = -1 -    TYPE_GFID  = [_CL_TYPE_DATA_PFX] # ignoring metadata ops -    TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX] +    TYPE_META  = "M " +    TYPE_GFID  = "D " +    TYPE_ENTRY = "E "      # flat directory heirarchy for gfid based access      FLAT_DIR_HIERARCHY = '.' @@ -594,39 +685,11 @@ class GMasterChangelogMixin(GMasterCommon):          logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))          return (workdir, logfile) -    # update stats from *this* crawl -    def update_cumulative_stats(self, files_pending): -        self.total_crawl_stats['files_remaining']  = files_pending['count'] -        self.total_crawl_stats['bytes_remaining']  = files_pending['bytes'] -        self.total_crawl_stats['purges_remaining'] = files_pending['purge'] - -    # sync data -    def syncdata(self, datas): -        logging.debug('datas: %s' % (datas)) -        for data in datas: -            logging.debug('candidate for syncing %s' % data) -            pb = self.syncer.add(data) -            def regjob(se, xte, pb): -                rv = pb.wait() -                if rv[0]: -                    logging.debug('synced ' + se) -                    return True -                else: -                    if rv[1] in [23, 24]: -                        # stat to check if the file exist -                        st = lstat(se) -                        if isinstance(st, int): -                            # file got unlinked in the interim -                            return True -                    logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) -            self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb) -        if self.wait(self.FLAT_DIR_HIERARCHY, None): -            return True -      def process_change(self, change, done, retry):          pfx = gauxpfx()          clist   = []          entries = [] +        meta_gfid = set()          datas = set()          # basic crawl stats: files and bytes @@ -652,136 +715,351 @@ class GMasterChangelogMixin(GMasterCommon):                      dct[k] = ed[k]              return dct -        # regular file update: bytes & count -        def _update_reg(entry, size): -            if not entry in files_pending['files']: -                files_pending['count'] += 1 -                files_pending['bytes'] += size -                files_pending['files'].append(entry) -        # updates for directories, symlinks etc.. -        def _update_rest(): +        # entry counts (not purges) +        def entry_update():              files_pending['count'] += 1 -        # entry count -        def entry_update(entry, size, mode): -            if stat.S_ISREG(mode): -                _update_reg(entry, size) -            else: -                _update_rest()          # purge count          def purge_update():              files_pending['purge'] += 1          for e in clist:              e = e.strip() -            et = e[self.IDX_START:self.IDX_END] -            ec = e[self.IDX_END:].split(' ') -            if et in self.TYPE_ENTRY: +            et = e[self.IDX_START:self.IDX_END]   # entry type +            ec = e[self.IDX_END:].split(' ')      # rest of the bits + +            if et == self.TYPE_ENTRY: +                # extract information according to the type of +                # the entry operation. create(), mkdir() and mknod() +                # have mode, uid, gid information in the changelog +                # itself, so no need to stat()...                  ty = ec[self.POS_TYPE] + +                # PARGFID/BNAME                  en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1])) +                # GFID of the entry                  gfid = ec[self.POS_GFID] -                # definitely need a better way bucketize entry ops +                  if ty in ['UNLINK', 'RMDIR']:                      purge_update()                      entries.append(edct(ty, gfid=gfid, entry=en)) -                    continue -                go = os.path.join(pfx, gfid) -                st = lstat(go) -                if isinstance(st, int): -		    if ty == 'RENAME': -                        entries.append(edct('UNLINK', gfid=gfid, entry=en)) -		    else: -                        logging.debug('file %s got purged in the interim' % go) -                    continue -                entry_update(go, st.st_size, st.st_mode) -                if ty in ['CREATE', 'MKDIR', 'MKNOD']: -                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) -                elif ty == 'LINK': -                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) -                elif ty == 'SYMLINK': -                    rl = errno_wrap(os.readlink, [en], [ENOENT]) -                    if isinstance(rl, int): -                        continue -                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) -                elif ty == 'RENAME': -                    e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2])) -                    entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2, stat=st)) +                elif ty in ['CREATE', 'MKDIR', 'MKNOD']: +                    entry_update() +                    # stat information present in the changelog itself +                    entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\ +                                        uid=int(ec[3]), gid=int(ec[4])))                  else: -                    logging.warn('ignoring %s [op %s]' % (gfid, ty)) -            elif et in self.TYPE_GFID: -                go = os.path.join(pfx, ec[0]) -                st = lstat(go) -                if isinstance(st, int): -                    logging.debug('file %s got purged in the interim' % go) -                    continue -                entry_update(go, st.st_size, st.st_mode) -                datas.update([go]) +                    # stat() to get mode and other information +                    go = os.path.join(pfx, gfid) +                    st = lstat(go) +                    if isinstance(st, int): +                        if ty == 'RENAME': # special hack for renames... +                            entries.append(edct('UNLINK', gfid=gfid, entry=en)) +                        else: +                            logging.debug('file %s got purged in the interim' % go) +                        continue + +                    if ty == 'LINK': +                        entry_update() +                        entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) +                    elif ty == 'SYMLINK': +                        rl = errno_wrap(os.readlink, [en], [ENOENT]) +                        if isinstance(rl, int): +                            continue +                        entry_update() +                        entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) +                    elif ty == 'RENAME': +                        entry_update() +                        e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) +                        entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) +                    else: +                        logging.warn('ignoring %s [op %s]' % (gfid, ty)) +            elif et == self.TYPE_GFID: +                datas.add(os.path.join(pfx, ec[0])) +            elif et == self.TYPE_META: +                if ec[1] == 'SETATTR': # only setattr's for now... +                    meta_gfid.add(os.path.join(pfx, ec[0])) +            else: +                logging.warn('got invalid changelog type: %s' % (et))          logging.debug('entries: %s' % repr(entries))          if not retry: -            self.update_cumulative_stats(files_pending) +            self.update_worker_cumilitive_status(files_pending)          # sync namespace          if (entries):              self.slave.server.entry_ops(entries) +        # sync metadata +        if (meta_gfid): +            meta_entries = [] +            for go in meta_gfid: +                st = lstat(go) +                if isinstance(st, int): +                    logging.debug('file %s got purged in the interim' % go) +                    continue +                meta_entries.append(edct('META', go=go, stat=st)) +            if meta_entries: +                self.slave.server.meta_ops(meta_entries)          # sync data -        if self.syncdata(datas): -            if done: -                self.master.server.changelog_done(change) -            return True - -    def sync_done(self): -        self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining'] -        self.total_crawl_stats['files_remaining']  = 0 -        self.total_crawl_stats['bytes_remaining']  = 0 -        self.total_crawl_stats['purges_remaining'] = 0 -        self.update_crawl_data() +        if datas: +            self.a_syncdata(datas)      def process(self, changes, done=1): -        for change in changes: -            tries = 0 -            retry = False -            while True: -                logging.debug('processing change %s' % change) -                if self.process_change(change, done, retry): -                    self.sync_done() -                    break -                retry = True -                tries += 1 -                if tries == self.MAX_RETRIES: -                    logging.warn('changelog %s could not be processed - moving on...' % os.path.basename(change)) -                    self.sync_done() -                    if done: -                        self.master.server.changelog_done(change) -                    break -                # it's either entry_ops() or Rsync that failed to do it's -                # job. Mostly it's entry_ops() [which currently has a problem -                # of failing to create an entry but failing to return an errno] -                # Therefore we do not know if it's either Rsync or the freaking -                # entry_ops() that failed... so we retry the _whole_ changelog -                # again. -                # TODO: remove entry retries when it's gets fixed. -                logging.warn('incomplete sync, retrying changelog: %s' % change) -                time.sleep(0.5) -            self.turns += 1 +        tries = 0 +        retry = False -    def upd_stime(self, stime): +        while True: +            self.skipped_gfid_list = [] +            self.current_files_skipped_count = 0 + +            # first, fire all changelog transfers in parallel. entry and metadata +            # are performed synchronously, therefore in serial. However at the end +            # of each changelog, data is synchronized with syncdata_async() - which +            # means it is serial w.r.t entries/metadata of that changelog but +            # happens in parallel with data of other changelogs. + +            for change in changes: +                logging.debug('processing change %s' % change) +                self.process_change(change, done, retry) +                if not retry: +                    self.turns += 1 # number of changelogs processed in the batch + +            # Now we wait for all the data transfers fired off in the above step +            # to complete. Note that this is not ideal either. Ideally we want to +            # trigger the entry/meta-data transfer of the next batch while waiting +            # for the data transfer of the current batch to finish. + +            # Note that the reason to wait for the data transfer (vs doing it +            # completely in the background and call the changelog_done() +            # asynchronously) is because this waiting acts as a "backpressure" +            # and prevents a spiraling increase of wait stubs from consuming +            # unbounded memory and resources. + +            # update the slave's time with the timestamp of the _last_ changelog +            # file time suffix. Since, the changelog prefix time is the time when +            # the changelog was rolled over, introduce a tolerence of 1 second to +            # counter the small delta b/w the marker update and gettimeofday(). +            # NOTE: this is only for changelog mode, not xsync. + +            # @change is the last changelog (therefore max time for this batch) +            if self.syncdata_wait(): +                if done: +                    xtl = (int(change.split('.')[-1]) - 1, 0) +                    self.upd_stime(xtl) +                    map(self.master.server.changelog_done, changes) +                self.update_worker_files_syncd() +                break + +            # We do not know which changelog transfer failed, retry everything. +            retry = True +            tries += 1 +            if tries == self.MAX_RETRIES: +                logging.warn('changelogs %s could not be processed - moving on...' % \ +                             ' '.join(map(os.path.basename, changes))) +                self.update_worker_total_files_skipped(self.current_files_skipped_count) +                logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) +                self.update_worker_files_syncd() +                if done: +                    xtl = (int(change.split('.')[-1]) - 1, 0) +                    self.upd_stime(xtl) +                    map(self.master.server.changelog_done, changes) +                break +            # it's either entry_ops() or Rsync that failed to do it's +            # job. Mostly it's entry_ops() [which currently has a problem +            # of failing to create an entry but failing to return an errno] +            # Therefore we do not know if it's either Rsync or the freaking +            # entry_ops() that failed... so we retry the _whole_ changelog +            # again. +            # TODO: remove entry retries when it's gets fixed. +            logging.warn('incomplete sync, retrying changelogs: %s' % \ +                         ' '.join(map(os.path.basename, changes))) +            time.sleep(0.5) + +    def upd_stime(self, stime, path=None): +        if not path: +            path = self.FLAT_DIR_HIERARCHY          if not stime == URXTIME: -            self.sendmark(self.FLAT_DIR_HIERARCHY, stime) +            self.sendmark(path, stime) + +    def get_worker_status_file(self): +        file_name = gconf.local_path+'.status' +        file_name = file_name.replace("/", "_") +        worker_status_file = gconf.georep_session_working_dir+file_name +        return worker_status_file + +    def update_worker_status(self, key, value): +        default_data = {"remote_node":"N/A", +                        "worker status":"Not Started", +                        "crawl status":"N/A", +                        "files_syncd": 0, +                        "files_remaining": 0, +                        "bytes_remaining": 0, +                        "purges_remaining": 0, +                        "total_files_skipped": 0} +        worker_status_file = self.get_worker_status_file() +        try: +            with open(worker_status_file, 'r+') as f: +                loaded_data = json.load(f) +                loaded_data[key] = value +                os.ftruncate(f.fileno(), 0) +                os.lseek(f.fileno(), 0, os.SEEK_SET) +                json.dump(loaded_data, f) +                f.flush() +                os.fsync(f.fileno()) +        except (IOError, OSError, ValueError): +            logging.info ('Creating new %s' % worker_status_file) +            try: +                with open(worker_status_file, 'wb') as f: +                    default_data[key] = value +                    json.dump(default_data, f) +                    f.flush() +                    os.fsync(f.fileno()) +            except: +                raise + +    def update_worker_cumilitive_status(self, files_pending): +        default_data = {"remote_node":"N/A", +                        "worker status":"Not Started", +                        "crawl status":"N/A", +                        "files_syncd": 0, +                        "files_remaining": 0, +                        "bytes_remaining": 0, +                        "purges_remaining": 0, +                        "total_files_skipped": 0} +        worker_status_file = self.get_worker_status_file() +        try: +            with open(worker_status_file, 'r+') as f: +                loaded_data = json.load(f) +                loaded_data['files_remaining']  = files_pending['count'] +                loaded_data['bytes_remaining']  = files_pending['bytes'] +                loaded_data['purges_remaining'] = files_pending['purge'] +                os.ftruncate(f.fileno(), 0) +                os.lseek(f.fileno(), 0, os.SEEK_SET) +                json.dump(loaded_data, f) +                f.flush() +                os.fsync(f.fileno()) +        except (IOError, OSError, ValueError): +            logging.info ('Creating new %s' % worker_status_file) +            try: +                with open(worker_status_file, 'wb') as f: +                    default_data['files_remaining']  = files_pending['count'] +                    default_data['bytes_remaining']  = files_pending['bytes'] +                    default_data['purges_remaining'] = files_pending['purge'] +                    json.dump(default_data, f) +                    f.flush() +                    os.fsync(f.fileno()) +            except: +                raise + +    def update_worker_remote_node (self): +        node = sys.argv[-1] +        node = node.split("@")[-1] +        remote_node_ip = node.split(":")[0] +        remote_node_vol = node.split(":")[3] +        remote_node = remote_node_ip + '::' + remote_node_vol +        self.update_worker_status ('remote_node', remote_node) + +    def update_worker_health (self, state): +        self.update_worker_status ('worker status', state) + +    def update_worker_crawl_status (self, state): +        self.update_worker_status ('crawl status', state) + +    def update_worker_files_syncd (self): +        default_data = {"remote_node":"N/A", +                        "worker status":"Not Started", +                        "crawl status":"N/A", +                        "files_syncd": 0, +                        "files_remaining": 0, +                        "bytes_remaining": 0, +                        "purges_remaining": 0, +                        "total_files_skipped": 0} +        worker_status_file = self.get_worker_status_file() +        try: +            with open(worker_status_file, 'r+') as f: +                loaded_data = json.load(f) +                loaded_data['files_syncd'] += loaded_data['files_remaining'] +                loaded_data['files_remaining']  = 0 +                loaded_data['bytes_remaining']  = 0 +                loaded_data['purges_remaining'] = 0 +                os.ftruncate(f.fileno(), 0) +                os.lseek(f.fileno(), 0, os.SEEK_SET) +                json.dump(loaded_data, f) +                f.flush() +                os.fsync(f.fileno()) +        except (IOError, OSError, ValueError): +            logging.info ('Creating new %s' % worker_status_file) +            try: +                with open(worker_status_file, 'wb') as f: +                    json.dump(default_data, f) +                    f.flush() +                    os.fsync(f.fileno()) +            except: +                raise + +    def update_worker_files_remaining (self, state): +        self.update_worker_status ('files_remaining', state) + +    def update_worker_bytes_remaining (self, state): +        self.update_worker_status ('bytes_remaining', state) + +    def update_worker_purges_remaining (self, state): +        self.update_worker_status ('purges_remaining', state) + +    def update_worker_total_files_skipped (self, value): +        default_data = {"remote_node":"N/A", +                        "worker status":"Not Started", +                        "crawl status":"N/A", +                        "files_syncd": 0, +                        "files_remaining": 0, +                        "bytes_remaining": 0, +                        "purges_remaining": 0, +                        "total_files_skipped": 0} +        worker_status_file = self.get_worker_status_file() +        try: +            with open(worker_status_file, 'r+') as f: +                loaded_data = json.load(f) +                loaded_data['total_files_skipped'] = value +                loaded_data['files_remaining'] -= value +                os.ftruncate(f.fileno(), 0) +                os.lseek(f.fileno(), 0, os.SEEK_SET) +                json.dump(loaded_data, f) +                f.flush() +                os.fsync(f.fileno()) +        except (IOError, OSError, ValueError): +            logging.info ('Creating new %s' % worker_status_file) +            try: +                with open(worker_status_file, 'wb') as f: +                    default_data['total_files_skipped'] = value +                    json.dump(default_data, f) +                    f.flush() +                    os.fsync(f.fileno()) +            except: +                raise      def crawl(self): +        self.update_worker_crawl_status("Changelog Crawl")          changes = [] +        # get stime (from the brick) and purge changelogs +        # that are _historical_ to that time. +        purge_time = self.xtime('.', self.slave) +        if isinstance(purge_time, int): +            purge_time = None          try:              self.master.server.changelog_scan()              self.crawls += 1          except OSError:              self.fallback_xsync() +            self.update_worker_crawl_status("Hybrid Crawl")          changes = self.master.server.changelog_getchanges()          if changes: -            xtl = self.xtime(self.FLAT_DIR_HIERARCHY) -            if isinstance(xtl, int): -                raise GsyncdError('master is corrupt') +            if purge_time: +                logging.info("slave's time: %s" % repr(purge_time)) +                processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]] +                for pr in processed: +                    logging.info('skipping already processed change: %s...' % os.path.basename(pr)) +                    self.master.server.changelog_done(pr) +                    changes.remove(pr)              logging.debug('processing changes %s' % repr(changes))              self.process(changes) -            self.upd_stime(xtl)      def register(self):          (workdir, logfile) = self.setup_working_dir() @@ -799,17 +1077,20 @@ class GMasterChangelogMixin(GMasterCommon):  class GMasterXsyncMixin(GMasterChangelogMixin):      """ -      This crawl needs to be xtime based (as of now      it's not. this is beacuse we generate CHANGELOG      file during each crawl which is then processed      by process_change()).      For now it's used as a one-shot initial sync      mechanism and only syncs directories, regular -    files and symlinks. +    files, hardlinks and symlinks.      """ +    XSYNC_MAX_ENTRIES = 1<<13 +      def register(self): +        self.counter = 0 +        self.comlist = []          self.sleep_interval = 60          self.tempdir = self.setup_working_dir()[0]          self.tempdir = os.path.join(self.tempdir, 'xsync') @@ -823,6 +1104,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              else:                  raise +    def crawl(self): +        """ +        event dispatcher thread + +        this thread dispatches either changelog or synchronizes stime. +        additionally terminates itself on recieving a 'finale' event +        """ +        def Xsyncer(): +            self.Xcrawl() +        t = Thread(target=Xsyncer) +        t.start() +        logging.info('starting hybrid crawl...') +        self.update_worker_crawl_status("Hybrid Crawl") +        while True: +            try: +                item = self.comlist.pop(0) +                if item[0] == 'finale': +                    logging.info('finished hybrid crawl syncing') +                    break +                elif item[0] == 'xsync': +                    logging.info('processing xsync changelog %s' % (item[1])) +                    self.process([item[1]], 0) +                elif item[0] == 'stime': +                    logging.debug('setting slave time: %s' % repr(item[1])) +                    self.upd_stime(item[1][1], item[1][0]) +                else: +                    logging.warn('unknown tuple in comlist (%s)' % repr(item)) +            except IndexError: +                time.sleep(1) +      def write_entry_change(self, prefix, data=[]):          self.fh.write("%s %s\n" % (prefix, ' '.join(data))) @@ -839,24 +1150,61 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      def fname(self):          return self.xsync_change -    def crawl(self, path='.', xtr=None, done=0): -        """ generate a CHANGELOG file consumable by process_change """ +    def put(self, mark, item): +        self.comlist.append((mark, item)) + +    def sync_xsync(self, last): +        """schedule a processing of changelog""" +        self.close() +        self.put('xsync', self.fname()) +        self.counter = 0 +        if not last: +            time.sleep(1) # make sure changelogs are 1 second apart +            self.open() + +    def sync_stime(self, stime=None, last=False): +        """schedule a stime synchronization""" +        if stime: +            self.put('stime', stime) +        if last: +            self.put('finale', None) + +    def sync_done(self, stime=None, last=False): +        self.sync_xsync(last) +        if stime: +            self.sync_stime(stime, last) + +    def Xcrawl(self, path='.', xtr_root=None): +        """ +        generate a CHANGELOG file consumable by process_change. + +        slave's xtime (stime) is _cached_ for comparisons across +        the filesystem tree, but set after directory synchronization. +        """          if path == '.':              self.open()              self.crawls += 1 -        if not xtr: +        if not xtr_root:              # get the root stime and use it for all comparisons -            xtr = self.xtime('.', self.slave) -            if isinstance(xtr, int): -                if xtr != ENOENT: -                    raise GsyncdError('slave is corrupt') -                xtr = self.minus_infinity +            xtr_root = self.xtime('.', self.slave) +            if isinstance(xtr_root, int): +                if xtr_root != ENOENT: +                    logging.warn("slave cluster not returning the " \ +                                 "correct xtime for root (%d)" % xtr_root) +                xtr_root = self.minus_infinity          xtl = self.xtime(path)          if isinstance(xtl, int): -            raise GsyncdError('master is corrupt') -        if xtr == xtl: +            logging.warn("master cluster's xtime not found") +        xtr = self.xtime(path, self.slave) +        if isinstance(xtr, int): +            if xtr != ENOENT: +                logging.warn("slave cluster not returning the " \ +                             "correct xtime for %s (%d)" % (path, xtr)) +            xtr = self.minus_infinity +        xtr = max(xtr, xtr_root) +        if not self.need_sync(path, xtl, xtr):              if path == '.': -                self.close() +                self.sync_done((path, xtl), True)              return          self.xtime_reversion_hook(path, xtl, xtr)          logging.debug("entering " + path) @@ -867,43 +1215,42 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          for e in dem:              bname = e              e = os.path.join(path, e) -            st = lstat(e) +            xte = self.xtime(e) +            if isinstance(xte, int): +                logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) +                continue +            if not self.need_sync(e, xte, xtr): +                continue +            st = self.master.server.lstat(e)              if isinstance(st, int): -                logging.warn('%s got purged in the interim..' % e) +                logging.warn('%s got purged in the interim ...' % e)                  continue              gfid = self.master.server.gfid(e)              if isinstance(gfid, int): -                logging.warn('skipping entry %s..' % (e)) -                continue -            xte = self.xtime(e) -            if isinstance(xte, int): -                raise GsyncdError('master is corrupt') -            if not self.need_sync(e, xte, xtr): +                logging.warn('skipping entry %s..' % e)                  continue              mo = st.st_mode +            self.counter += 1 +            if self.counter == self.XSYNC_MAX_ENTRIES: +                self.sync_done()              if stat.S_ISDIR(mo): -                self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))]) -                self.crawl(e, xtr) +                self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) +                self.Xcrawl(e, xtr_root) +                self.sync_done((e, xte), False)              elif stat.S_ISLNK(mo): -                rl = errno_wrap(os.readlink, [en], [ENOENT]) -                if isinstance(rl, int): -                    continue -                self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname)), rl]) -            else: +                self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) +            elif stat.S_ISREG(mo): +                nlink = st.st_nlink +                nlink -= 1 # fixup backend stat link count                  # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave                  # side will decide if to create the new entry, or to create link. -                if st.st_nlink == 1: -                    self.write_entry_change("E", [gfid, 'MKNOD', escape(os.path.join(pargfid, bname))]) +                if nlink == 1: +                    self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))])                  else:                      self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))]) -                if stat.S_ISREG(mo): -                    self.write_entry_change("D", [gfid]) - +                self.write_entry_change("D", [gfid])          if path == '.': -            logging.info('processing xsync changelog %s' % self.fname()) -            self.close() -            self.process([self.fname()], done) -            self.upd_stime(xtl) +            self.sync_done((path, xtl), True)  class BoxClosedErr(Exception):      pass @@ -979,12 +1326,13 @@ class Syncer(object):      each completed syncjob.      """ -    def __init__(self, slave): +    def __init__(self, slave, sync_engine, resilient_errnos=[]):          """spawn worker threads"""          self.slave = slave          self.lock = Lock()          self.pb = PostBox() -        self.bytes_synced = 0 +        self.sync_engine = sync_engine +        self.errnos_ok = resilient_errnos          for i in range(int(gconf.sync_jobs)):              t = Thread(target=self.syncjob)              t.start() @@ -1002,11 +1350,10 @@ class Syncer(object):                      break                  time.sleep(0.5)              pb.close() -            po = self.slave.rsync(pb) +            po = self.sync_engine(pb)              if po.returncode == 0:                  ret = (True, 0) -            elif po.returncode in (23, 24): -                # partial transfer (cf. rsync(1)), that's normal +            elif po.returncode in self.errnos_ok:                  ret = (False, po.returncode)              else:                  po.errfail()  | 
