diff options
| -rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 2 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/changelogsdb.py | 111 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 22 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 2 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 504 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 54 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 7 | 
7 files changed, 516 insertions, 186 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index ed0f5e40924..ce875bdacb6 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -3,6 +3,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon  syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \  	resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \  	$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \ -	gsyncdstatus.py +	gsyncdstatus.py changelogsdb.py  CLEANFILES = diff --git a/geo-replication/syncdaemon/changelogsdb.py b/geo-replication/syncdaemon/changelogsdb.py new file mode 100644 index 00000000000..7e64158e7af --- /dev/null +++ b/geo-replication/syncdaemon/changelogsdb.py @@ -0,0 +1,111 @@ +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +import sqlite3 +from errno import ENOENT + +conn = None +cursor = None + + +def db_commit(): +    conn.commit() + + +def db_init(db_path): +    global conn, cursor +    # Remove Temp Db +    try: +        os.unlink(db_path) +        os.unlink(db_path + "-journal") +    except OSError as e: +        if e.errno != ENOENT: +            raise + +    conn = sqlite3.connect(db_path) +    cursor = conn.cursor() +    cursor.execute("DROP TABLE IF EXISTS data") +    cursor.execute("DROP TABLE IF EXISTS meta") +    query = """CREATE TABLE IF NOT EXISTS data( +    gfid           VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE, +    changelog_time VARCHAR(100) +    )""" +    cursor.execute(query) + +    query = """CREATE TABLE IF NOT EXISTS meta( +    gfid           VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE, +    changelog_time VARCHAR(100) +    )""" +    cursor.execute(query) + + +def db_record_data(gfid, changelog_time): +    query = "INSERT INTO data(gfid, changelog_time) VALUES(?, ?)" +    cursor.execute(query, (gfid, changelog_time)) + + +def db_record_meta(gfid, changelog_time): +    query = "INSERT INTO meta(gfid, changelog_time) VALUES(?, ?)" +    cursor.execute(query, (gfid, changelog_time)) + + +def db_remove_meta(gfid): +    query = "DELETE FROM meta WHERE gfid = ?" +    cursor.execute(query, (gfid, )) + + +def db_remove_data(gfid): +    query = "DELETE FROM data WHERE gfid = ?" +    cursor.execute(query, (gfid, )) + + +def db_get_data(start, end, limit, offset): +    query = """SELECT gfid FROM data WHERE changelog_time +    BETWEEN ? AND ? LIMIT ? OFFSET ?""" +    cursor.execute(query, (start, end, limit, offset)) +    out = [] +    for row in cursor: +        out.append(row[0]) + +    return out + + +def db_get_meta(start, end, limit, offset): +    query = """SELECT gfid FROM meta WHERE changelog_time +    BETWEEN ? AND ? LIMIT ? OFFSET ?""" +    cursor.execute(query, (start, end, limit, offset)) +    out = [] +    for row in cursor: +        out.append(row[0]) + +    return out + + +def db_delete_meta_if_exists_in_data(): +    query = """ +    DELETE FROM meta WHERE gfid in +    (SELECT M.gfid +     FROM meta M INNER JOIN data D +     ON M.gfid = D.gfid) +    """ +    cursor.execute(query) + + +def db_get_data_count(): +    query = "SELECT COUNT(gfid) FROM data" +    cursor.execute(query) +    return cursor.fetchone()[0] + + +def db_get_meta_count(): +    query = "SELECT COUNT(gfid) FROM meta" +    cursor.execute(query) +    return cursor.fetchone()[0] diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index c2699a183ae..918bee0ce1c 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -274,6 +274,28 @@ def main_i():      op.add_option('--sync-acls', default=True, action='store_true')      op.add_option('--log-rsync-performance', default=False,                    action='store_true') +    op.add_option('--max-rsync-retries', type=int, default=10) + +    # This is for stime granularity, Bigger batch will be split into +    # multiple data batches, On failure it will start from this point +    # Default value is 1 day changelogs +    # (4 * 60 * 24 = 5760) +    #    4 changelogs per minute +    #   60 min per hr +    #   24 hrs per day +    op.add_option('--max-data-changelogs-in-batch', type=int, default=5760) + +    # While processing Historical Changelogs above BATCH SIZE is not considered +    # since all Changelogs to be post processed once, Batching it makes more +    # rsync retries. (4 * 60 * 24 * 15 = 86400) +    #    4 changelogs per minute +    #   60 min per hr +    #   24 hrs per day +    #   15 days +    # This means 15 days changelogs can be processed at once in case of +    # History scan +    op.add_option('--max-history-changelogs-in-batch', type=int, default=86400) +      op.add_option('--pause-on-start', default=False, action='store_true')      op.add_option('-L', '--log-level', metavar='LVL')      op.add_option('-r', '--remote-gsyncd', metavar='CMD', diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index 88398e2ce8a..beacd7473b2 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -52,6 +52,7 @@ def get_default_values():          "slave_node": DEFAULT_STATUS,          "worker_status": DEFAULT_STATUS,          "last_synced": 0, +        "last_synced_entry": 0,          "crawl_status": DEFAULT_STATUS,          "entry": 0,          "data": 0, @@ -239,6 +240,7 @@ class GeorepStatus(object):          slave_node                 N/A        VALUE    VALUE       N/A          status                     Created    VALUE    Paused      Stopped          last_synced                N/A        VALUE    VALUE       VALUE +        last_synced_entry          N/A        VALUE    VALUE       VALUE          crawl_status               N/A        VALUE    N/A         N/A          entry                      N/A        VALUE    N/A         N/A          data                       N/A        VALUE    N/A         N/A diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 80c4d9d8b95..3df08e41a13 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -25,7 +25,13 @@ from gconf import gconf  from syncdutils import Thread, GsyncdError, boolify, escape  from syncdutils import unescape, gauxpfx, md5hex, selfkill  from syncdutils import lstat, errno_wrap, FreeObject -from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable +from syncdutils import NoStimeAvailable, PartialHistoryAvailable +from changelogsdb import db_init, db_record_data, db_record_meta +from changelogsdb import db_remove_data, db_remove_meta +from changelogsdb import db_get_data, db_get_meta, db_commit +from changelogsdb import db_get_data_count, db_get_meta_count +from changelogsdb import db_delete_meta_if_exists_in_data +  URXTIME = (-1, 0) @@ -45,6 +51,10 @@ CHANGELOG_ROLLOVER_TIME = 15  # that batch since stime will get updated after each batch.  MAX_CHANGELOG_BATCH_SIZE = 727040 +# Number of record to query once +DB_PAGINATION_SIZE_META = 100 +DB_PAGINATION_SIZE_DATA = 1000 +  # Utility functions to help us to get to closer proximity  # of the DRY principle (no, don't look for elevated or  # perspectivistic things here) @@ -69,6 +79,24 @@ def _volinfo_hook_relax_foreign(self):      return volinfo_sys +def edct(op, **ed): +    dct = {} +    dct['op'] = op +    for k in ed: +        if k == 'stat': +            st = ed[k] +            dst = dct['stat'] = {} +            if st: +                dst['uid'] = st.st_uid +                dst['gid'] = st.st_gid +                dst['mode'] = st.st_mode +                dst['atime'] = st.st_atime +                dst['mtime'] = st.st_mtime +        else: +            dct[k] = ed[k] +    return dct + +  # The API!  def gmaster_builder(excrawl=None): @@ -259,7 +287,7 @@ class TarSSHEngine(object):                      st = lstat(se)                      if isinstance(st, int):                          # file got unlinked in the interim -                        self.unlinked_gfids.add(se) +                        db_remove_data(se)                          return True              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -294,7 +322,7 @@ class RsyncEngine(object):                      st = lstat(se)                      if isinstance(st, int):                          # file got unlinked in the interim -                        self.unlinked_gfids.add(se) +                        db_remove_data(se)                          return True              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -340,6 +368,18 @@ class GMasterCommon(object):          if self.volinfo:              return self.volinfo['volume_mark'] +    def get_entry_stime(self): +        data = self.slave.server.entry_stime(".", self.uuid) +        if isinstance(data, int): +            data = None +        return data + +    def get_data_stime(self): +        data = self.slave.server.stime(".", self.uuid) +        if isinstance(data, int): +            data = None +        return data +      def xtime(self, path, *a, **opts):          """get amended xtime @@ -387,7 +427,6 @@ class GMasterCommon(object):          self.volinfo = None          self.terminate = False          self.sleep_interval = 1 -        self.unlinked_gfids = set()      def init_keep_alive(cls):          """start the keep-alive thread """ @@ -553,7 +592,7 @@ class GMasterCommon(object):                              self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)                          # Purge all changelogs available in processing dir                          # less than cluster_stime -                        proc_dir = os.path.join(self.setup_working_dir(), +                        proc_dir = os.path.join(self.tempdir,                                                  ".processing")                          if os.path.exists(proc_dir): @@ -627,6 +666,11 @@ class GMasterCommon(object):              ret = j[-1]()              if not ret:                  succeed = False + +        # All the unlinked GFIDs removed from Data and Meta list +        # Commit the Transaction +        db_commit() +          if succeed and not args[0] is None:              self.sendmark(path, *args)          return succeed @@ -670,9 +714,6 @@ class GMasterChangelogMixin(GMasterCommon):      # flat directory hierarchy for gfid based access      FLAT_DIR_HIERARCHY = '.' -    # maximum retries per changelog before giving up -    MAX_RETRIES = 10 -      CHANGELOG_LOG_LEVEL = 9      CHANGELOG_CONN_RETRIES = 5 @@ -727,21 +768,48 @@ class GMasterChangelogMixin(GMasterCommon):          logging.debug('changelog working dir %s' % workdir)          return workdir -    def get_purge_time(self): -        purge_time = self.xtime('.', self.slave) -        if isinstance(purge_time, int): -            purge_time = None -        return purge_time +    def log_failures(self, failures, entry_key, gfid_prefix, log_prefix): +        num_failures = 0 +        for failure in failures: +            st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) +            if not isinstance(st, int): +                num_failures += 1 +                logging.error('%s FAILED: %s' % (log_prefix, +                                                 repr(failure))) + +        self.status.inc_value("failures", num_failures)      def process_change(self, change, done, retry):          pfx = gauxpfx()          clist = []          entries = [] -        meta_gfid = set() -        datas = set() +        change_ts = change.split(".")[-1] + +        # self.data_batch_start is None only in beginning and during +        # new batch start +        if self.data_batch_start is None: +            self.data_batch_start = change_ts + +        # Ignore entry ops which are already processed in Changelog modes +        ignore_entry_ops = False +        entry_stime = None +        data_stime = None +        if self.name in ["live_changelog", "history_changelog"]: +            entry_stime = self.get_entry_stime() +            data_stime = self.get_data_stime() + +        if entry_stime is not None and data_stime is not None: +            # if entry_stime is not None but data_stime > entry_stime +            # This situation is caused by the stime update of Passive worker +            # Consider data_stime in this case. +            if data_stime[0] > entry_stime[0]: +                entry_stime = data_stime + +            # Compare the entry_stime with changelog file suffix +            # if changelog time is less than entry_stime then ignore +            if int(change_ts) <= entry_stime[0]: +                ignore_entry_ops = True -        # basic crawl stats: files and bytes -        files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}          try:              f = open(change, "r")              clist = f.readlines() @@ -749,42 +817,6 @@ class GMasterChangelogMixin(GMasterCommon):          except IOError:              raise -        def edct(op, **ed): -            dct = {} -            dct['op'] = op -            for k in ed: -                if k == 'stat': -                    st = ed[k] -                    dst = dct['stat'] = {} -                    if st: -                        dst['uid'] = st.st_uid -                        dst['gid'] = st.st_gid -                        dst['mode'] = st.st_mode -                        dst['atime'] = st.st_atime -                        dst['mtime'] = st.st_mtime -                else: -                    dct[k] = ed[k] -            return dct - -        # entry counts (not purges) -        def entry_update(): -            files_pending['count'] += 1 - -        # purge count -        def purge_update(): -            files_pending['purge'] += 1 - -        def log_failures(failures, entry_key, gfid_prefix, log_prefix): -            num_failures = 0 -            for failure in failures: -                st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) -                if not isinstance(st, int): -                    num_failures += 1 -                    logging.error('%s FAILED: %s' % (log_prefix, -                                                     repr(failure))) - -            self.status.inc_value("failures", num_failures) -          for e in clist:              e = e.strip()              et = e[self.IDX_START:self.IDX_END]   # entry type @@ -792,12 +824,20 @@ class GMasterChangelogMixin(GMasterCommon):              # skip ENTRY operation if hot tier brick              if self.name == 'live_changelog' or \ -                self.name == 'history_changelog': +               self.name == 'history_changelog':                  if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:                      logging.debug('skip ENTRY op: %s if hot tier brick'                                    % (ec[self.POS_TYPE]))                      continue +            # Data and Meta operations are decided while parsing +            # UNLINK/RMDIR/MKNOD except that case ignore all the other +            # entry ops if ignore_entry_ops is True. +            # UNLINK/RMDIR/MKNOD entry_ops are ignored in the end +            if ignore_entry_ops and et == self.TYPE_ENTRY and \ +               ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]: +                continue +              if et == self.TYPE_ENTRY:                  # extract information according to the type of                  # the entry operation. create(), mkdir() and mknod() @@ -819,15 +859,16 @@ class GMasterChangelogMixin(GMasterCommon):                      # Remove from DATA list, so that rsync will                      # not fail                      pt = os.path.join(pfx, ec[0]) -                    if pt in datas: -                        datas.remove(pt) +                    st = lstat(pt) +                    if isinstance(st, int): +                        # file got unlinked, May be historical Changelog +                        db_remove_data(pt) +                        db_remove_meta(pt)                      if not boolify(gconf.ignore_deletes): -                        purge_update() -                        entries.append(edct(ty, gfid=gfid, entry=en)) +                        if not ignore_entry_ops: +                            entries.append(edct(ty, gfid=gfid, entry=en))                  elif ty in ['CREATE', 'MKDIR', 'MKNOD']: -                    entry_update() -                      # Special case: record mknod as link                      if ty in ['MKNOD']:                          mode = int(ec[2]) @@ -845,10 +886,10 @@ class GMasterChangelogMixin(GMasterCommon):                                  # CREATED if source not exists.                                  entries.append(edct('LINK', stat=st, entry=en,                                                 gfid=gfid)) -                                  # Here, we have the assumption that only                                  # tier-gfid.linkto causes this mknod. Add data -                                datas.add(os.path.join(pfx, ec[0])) +                                db_record_data(os.path.join(pfx, ec[0]), +                                               change_ts)                                  continue                      # stat info. present in the changelog itself @@ -867,7 +908,6 @@ class GMasterChangelogMixin(GMasterCommon):                          if isinstance(rl, int):                              rl = None -                    entry_update()                      e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))                      entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,                                          stat=st, link=rl)) @@ -880,120 +920,102 @@ class GMasterChangelogMixin(GMasterCommon):                          continue                      if ty == 'LINK': -                        entry_update()                          entries.append(edct(ty, stat=st, entry=en, gfid=gfid))                      elif ty == 'SYMLINK':                          rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE])                          if isinstance(rl, int):                              continue -                        entry_update() +                          entries.append(                              edct(ty, stat=st, entry=en, gfid=gfid, link=rl))                      else:                          logging.warn('ignoring %s [op %s]' % (gfid, ty))              elif et == self.TYPE_GFID: -                # If self.unlinked_gfids is available, then that means it is -                # retrying the changelog second time. Do not add the GFID's -                # to rsync job if failed previously but unlinked in master -                if self.unlinked_gfids and \ -                   os.path.join(pfx, ec[0]) in self.unlinked_gfids: -                    logging.debug("ignoring data, since file purged interim") -                else: -                    datas.add(os.path.join(pfx, ec[0])) +                db_record_data(os.path.join(pfx, ec[0]), change_ts)              elif et == self.TYPE_META:                  if ec[1] == 'SETATTR':  # only setattr's for now... -                    if len(ec) == 5: -                        # In xsync crawl, we already have stat data -                        # avoid doing stat again -                        meta_gfid.add((os.path.join(pfx, ec[0]), -                                       XCrawlMetadata(st_uid=ec[2], -                                                      st_gid=ec[3], -                                                      st_mode=ec[4], -                                                      st_atime=ec[5], -                                                      st_mtime=ec[6]))) -                    else: -                        meta_gfid.add((os.path.join(pfx, ec[0]), )) +                    db_record_meta(os.path.join(pfx, ec[0]), change_ts)                  elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \                       ec[1] == 'FXATTROP':                      # To sync xattr/acls use rsync/tar, --xattrs and --acls                      # switch to rsync and tar                      if not boolify(gconf.use_tarssh) and \                         (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)): -                        datas.add(os.path.join(pfx, ec[0])) +                        db_record_data(os.path.join(pfx, ec[0]), change_ts)              else:                  logging.warn('got invalid changelog type: %s' % (et))          logging.debug('entries: %s' % repr(entries)) -        # Increment counters for Status -        self.status.inc_value("entry", len(entries)) -        self.files_in_batch += len(datas) -        self.status.inc_value("data", len(datas)) -          # sync namespace -        if entries: +        if entries and not ignore_entry_ops: +            # Increment counters for Status +            self.status.inc_value("entry", len(entries))              failures = self.slave.server.entry_ops(entries) -            log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') +            self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')              self.status.dec_value("entry", len(entries)) -        # sync metadata -        if meta_gfid: -            meta_entries = [] -            for go in meta_gfid: -                if len(go) > 1: -                    st = go[1] -                else: -                    st = lstat(go[0]) -                if isinstance(st, int): -                    logging.debug('file %s got purged in the interim' % go[0]) -                    continue -                meta_entries.append(edct('META', go=go[0], stat=st)) -            if meta_entries: -                self.status.inc_value("meta", len(entries)) -                failures = self.slave.server.meta_ops(meta_entries) -                log_failures(failures, 'go', '', 'META') -                self.status.dec_value("meta", len(entries)) - -        # sync data -        if datas: +            # Update Entry stime in Brick Root only in case of Changelog mode +            if self.name in ["live_changelog", "history_changelog"]: +                entry_stime_to_update = (int(change_ts) - 1, 0) +                self.upd_entry_stime(entry_stime_to_update) +                self.status.set_field("last_synced_entry", +                                      entry_stime_to_update[0]) + +        if ignore_entry_ops: +            # Book keeping, to show in logs the range of Changelogs skipped +            self.num_skipped_entry_changelogs += 1 +            if self.skipped_entry_changelogs_first is None: +                self.skipped_entry_changelogs_first = change_ts + +            self.skipped_entry_changelogs_last = change_ts + +        # Batch data based on number of changelogs as configured as +        # gconf.max_data_changelogs_in_batch(Default is 24 hrs) +        # stime will be set after completion of these batch, so on failure +        # Geo-rep will progress day by day +        if (self.num_changelogs > gconf.max_data_changelogs_in_batch): +            # (Start Changelog TS, End Changelog TS, [Changes]) +            self.data_batches.append([self.data_batch_start, change_ts, +                                      [change]]) +            self.data_batch_start = None +            self.num_changelogs = 0 +        else: +            self.data_batches[-1][1] = change_ts +            self.data_batches[-1][2].append(change) + +    def datas_to_queue(self, start, end): +        # Paginate db entries and add it to Rsync PostBox +        offset = 0 +        total_datas = 0 +        while True: +            # Db Pagination +            datas = db_get_data(start=start, end=end, +                                limit=DB_PAGINATION_SIZE_DATA, +                                offset=offset) +            if len(datas) == 0: +                break +            offset += DB_PAGINATION_SIZE_DATA +            total_datas += len(datas)              self.a_syncdata(datas) -            self.datas_in_batch.update(datas) +        return total_datas -    def process(self, changes, done=1): -        tries = 0 +    def handle_data_sync(self, start, end, changes, done, total_datas): +        """ +        Wait till all rsync jobs are complete, also handle the retries +        Update data stime Once Rsync jobs are complete. +        """          retry = False -        self.unlinked_gfids = set() -        self.files_in_batch = 0 -        self.datas_in_batch = set() +        tries = 0 +        # Error log disabled till the last round          self.syncer.disable_errorlog()          while True: -            # first, fire all changelog transfers in parallel. entry and -            # metadata are performed synchronously, therefore in serial. -            # However at the end of each changelog, data is synchronized -            # with syncdata_async() - which means it is serial w.r.t -            # entries/metadata of that changelog but happens in parallel -            # with data of other changelogs. -              if retry: -                if tries == (self.MAX_RETRIES - 1): -                    # Enable Error logging if it is last retry -                    self.syncer.enable_errorlog() - -                # Remove Unlinked GFIDs from Queue -                for unlinked_gfid in self.unlinked_gfids: -                    if unlinked_gfid in self.datas_in_batch: -                        self.datas_in_batch.remove(unlinked_gfid) - -                # Retry only Sync. Do not retry entry ops -                if self.datas_in_batch: -                    self.a_syncdata(self.datas_in_batch) -            else: -                for change in changes: -                    logging.debug('processing change %s' % change) -                    self.process_change(change, done, retry) -                    if not retry: -                        # number of changelogs processed in the batch -                        self.turns += 1 +                self.datas_to_queue(start, end) + +            if retry and tries == (gconf.max_rsync_retries - 1): +                # Enable Error logging if it is last retry +                self.syncer.enable_errorlog()              # Now we wait for all the data transfers fired off in the above              # step to complete. Note that this is not ideal either. Ideally @@ -1016,38 +1038,34 @@ class GMasterChangelogMixin(GMasterCommon):              # @change is the last changelog (therefore max time for this batch)              if self.syncdata_wait(): -                self.unlinked_gfids = set()                  if done: -                    xtl = (int(change.split('.')[-1]) - 1, 0) +                    xtl = (int(changes[-1].split('.')[-1]) - 1, 0)                      self.upd_stime(xtl)                      map(self.changelog_done_func, changes)                      self.archive_and_purge_changelogs(changes)                  # Reset Data counter after sync -                self.status.dec_value("data", self.files_in_batch) -                self.files_in_batch = 0 -                self.datas_in_batch = set() +                self.status.dec_value("data", total_datas)                  break              # We do not know which changelog transfer failed, retry everything.              retry = True              tries += 1 -            if tries == self.MAX_RETRIES: +            if tries >= gconf.max_rsync_retries:                  logging.error('changelogs %s could not be processed '                                'completely - moving on...' %                                ' '.join(map(os.path.basename, changes)))                  # Reset data counter on failure -                self.status.dec_value("data", self.files_in_batch) -                self.files_in_batch = 0 -                self.datas_in_batch = set() +                self.status.dec_value("data", total_datas)                  if done: -                    xtl = (int(change.split('.')[-1]) - 1, 0) +                    xtl = (int(changes[-1].split('.')[-1]) - 1, 0)                      self.upd_stime(xtl)                      map(self.changelog_done_func, changes)                      self.archive_and_purge_changelogs(changes)                  break +              # it's either entry_ops() or Rsync that failed to do it's              # job. Mostly it's entry_ops() [which currently has a problem              # of failing to create an entry but failing to return an errno] @@ -1058,11 +1076,114 @@ class GMasterChangelogMixin(GMasterCommon):              logging.warn('incomplete sync, retrying changelogs: %s' %                           ' '.join(map(os.path.basename, changes))) -            # Reset the Data counter before Retry -            self.status.dec_value("data", self.files_in_batch) -            self.files_in_batch = 0              time.sleep(0.5) +        # Log Current batch details +        if changes: +            logging.info( +                "{0} mode completed in {1:.4f} seconds " +                "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format( +                    self.name, +                    time.time() - self.batch_start_time, +                    changes[0].split("/")[-1], +                    changes[-1].split("/")[-1], +                    len(changes), +                    repr(self.get_data_stime()), +                    repr(self.get_entry_stime()))) + +    def process(self, changes, done=1): +        retry = False +        first_changelog_ts = changes[0].split(".")[-1] + +        db_init(os.path.join(self.tempdir, "temp_changelogs.db")) + +        self.skipped_entry_changelogs_first = None +        self.skipped_entry_changelogs_last = None +        self.num_skipped_entry_changelogs = 0 +        self.batch_start_time = time.time() +        # (Start Changelog TS, End Changelog TS, [Changes]) +        self.data_batches = [[first_changelog_ts, first_changelog_ts, []]] +        self.data_batch_start = None +        self.num_changelogs = 0 + +        for change in changes: +            logging.debug('processing change %s' % change) +            self.process_change(change, done, retry) +            # number of changelogs processed in the batch +            self.turns += 1 + +        # Rsync/Tar will preserve permissions, so if a GFID exists +        # in data queue then it syncs meta details too. Remove +        # all meta from meta table if exists in data table +        db_delete_meta_if_exists_in_data() + +        # All the Data/Meta populated, Commit the Changes in Db +        db_commit() + +        # Log the Skipped Entry ops range if any +        if self.skipped_entry_changelogs_first is not None and \ +           self.skipped_entry_changelogs_last is not None: +            logging.info("Skipping already processed entry " +                         "ops from CHANGELOG.{0} to CHANGELOG.{1} " +                         "Num: {2}".format( +                             self.skipped_entry_changelogs_first, +                             self.skipped_entry_changelogs_last, +                             self.num_skipped_entry_changelogs)) + +        # Entry Changelogs syncing finished +        logging.info("Syncing Entries completed in {0:.4f} seconds " +                     "CHANGELOG.{1} - CHANGELOG.{2} " +                     "Num: {3}".format( +                         time.time() - self.batch_start_time, +                         changes[0].split(".")[-1], +                         changes[-1].split(".")[-1], +                         len(changes))) + +        # Update Status Data and Meta Count +        self.status.inc_value("data", db_get_data_count()) +        self.status.inc_value("meta", db_get_meta_count()) + +        for b in self.data_batches: +            # Add to data Queue, so that Rsync will start parallelly +            # while syncing Meta ops +            total_datas = self.datas_to_queue(b[0], b[1]) + +            # Sync Meta +            offset = 0 +            while True: +                # Db Pagination +                meta_gfids = db_get_meta(start=b[0], end=b[1], +                                         limit=DB_PAGINATION_SIZE_META, +                                         offset=offset) +                if len(meta_gfids) == 0: +                    break +                offset += DB_PAGINATION_SIZE_META + +                # Collect required information for GFIDs which +                # exists in Master +                meta_entries = [] +                for go in meta_gfids: +                    st = lstat(go) +                    if isinstance(st, int): +                        logging.debug('file %s got purged in the ' +                                      'interim' % go) +                        continue +                    meta_entries.append(edct('META', go=go, stat=st)) + +                if meta_entries: +                    failures = self.slave.server.meta_ops(meta_entries) +                    self.log_failures(failures, 'go', '', 'META') +                    self.status.dec_value("meta", len(meta_entries)) + +            # Sync Data, Rsync already started syncing the files +            # wait for the completion and retry if required. +            self.handle_data_sync(b[0], b[1], b[2], done, total_datas) + +    def upd_entry_stime(self, stime): +        self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY, +                                          self.uuid, +                                          stime) +      def upd_stime(self, stime, path=None):          if not path:              path = self.FLAT_DIR_HIERARCHY @@ -1085,7 +1206,12 @@ class GMasterChangelogMixin(GMasterCommon):          remote_node_ip = node.split(":")[0]          self.status.set_slave_node(remote_node_ip) -    def changelogs_batch_process(self, changes): +    def changelogs_batch_process(self, changes, single_batch=False): +        if single_batch and changes: +            logging.debug('processing changes %s' % repr(changes)) +            self.process(changes) +            return +          changelogs_batches = []          current_size = 0          for c in changes: @@ -1112,16 +1238,15 @@ class GMasterChangelogMixin(GMasterCommon):          changes = []          # get stime (from the brick) and purge changelogs          # that are _historical_ to that time. -        purge_time = self.get_purge_time() +        data_stime = self.get_data_stime()          self.changelog_agent.scan()          self.crawls += 1          changes = self.changelog_agent.getchanges()          if changes: -            if purge_time: -                logging.info("slave's time: %s" % repr(purge_time)) +            if data_stime:                  processed = [x for x in changes -                             if int(x.split('.')[-1]) < purge_time[0]] +                             if int(x.split('.')[-1]) < data_stime[0]]                  for pr in processed:                      logging.info(                          'skipping already processed change: %s...' % @@ -1136,7 +1261,8 @@ class GMasterChangelogMixin(GMasterCommon):          self.changelog_agent = changelog_agent          self.sleep_interval = int(gconf.change_interval)          self.changelog_done_func = self.changelog_agent.done -        self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), +        self.tempdir = self.setup_working_dir() +        self.processed_changelogs_dir = os.path.join(self.tempdir,                                                       ".processed")          self.name = "live_changelog"          self.status = status @@ -1149,7 +1275,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          self.history_crawl_start_time = register_time          self.changelog_done_func = self.changelog_agent.history_done          self.history_turns = 0 -        self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), +        self.tempdir = self.setup_working_dir() +        self.processed_changelogs_dir = os.path.join(self.tempdir,                                                       ".history/.processed")          self.name = "history_changelog"          self.status = status @@ -1157,15 +1284,17 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):      def crawl(self):          self.history_turns += 1          self.status.set_worker_crawl_status("History Crawl") -        purge_time = self.get_purge_time() +        data_stime = self.get_data_stime()          end_time = int(time.time()) -        logging.info('starting history crawl... turns: %s, stime: %s, etime: %s' -                     % (self.history_turns, repr(purge_time), repr(end_time))) +        logging.info('starting history crawl... turns: %s, stime: %s, ' +                     'etime: %s, entry_stime: %s' +                     % (self.history_turns, repr(data_stime), +                        repr(end_time), self.get_entry_stime())) -        if not purge_time or purge_time == URXTIME: +        if not data_stime or data_stime == URXTIME:              logging.info("stime not available, abandoning history crawl") -            raise NoPurgeTimeAvailable() +            raise NoStimeAvailable()          # Changelogs backend path is hardcoded as          # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different @@ -1174,7 +1303,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):                                        ".glusterfs/changelogs")          ret, actual_end = self.changelog_agent.history(              changelog_path, -            purge_time[0], +            data_stime[0],              end_time,              int(gconf.sync_jobs)) @@ -1184,27 +1313,42 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          # to be processed. returns positive value as number of changelogs          # to be processed, which will be fetched using          # history_getchanges() -        while self.changelog_agent.history_scan() > 0: +        num_scanned_changelogs = self.changelog_agent.history_scan() +        num_changelogs = num_scanned_changelogs +        changes = [] +        while num_scanned_changelogs > 0:              self.crawls += 1 -            changes = self.changelog_agent.history_getchanges() +            changes += self.changelog_agent.history_getchanges()              if changes: -                if purge_time: -                    logging.info("slave's time: %s" % repr(purge_time)) +                if data_stime:                      processed = [x for x in changes -                                 if int(x.split('.')[-1]) < purge_time[0]] +                                 if int(x.split('.')[-1]) < data_stime[0]]                      for pr in processed:                          logging.info('skipping already processed change: '                                       '%s...' % os.path.basename(pr))                          self.changelog_done_func(pr)                          changes.remove(pr) -            self.changelogs_batch_process(changes) +            if num_changelogs > gconf.max_history_changelogs_in_batch: +                self.changelogs_batch_process(changes, single_batch=True) +                num_changelogs = 0 +                changes = [] + +            num_scanned_changelogs = self.changelog_agent.history_scan() +            num_changelogs += num_scanned_changelogs + +        # If Last batch is not processed with MAX_NUM_CHANGELOGS_IN_BATCH +        # condition above +        if changes: +            self.changelogs_batch_process(changes, single_batch=True)          history_turn_time = int(time.time()) - self.history_crawl_start_time -        logging.info('finished history crawl syncing, endtime: %s, stime: %s' -                     % (actual_end, repr(self.get_purge_time()))) +        logging.info('finished history crawl syncing, endtime: %s, ' +                     'stime: %s, entry_stime: %s' +                     % (actual_end, repr(self.get_data_stime()), +                        self.get_entry_stime()))          # If TS returned from history_changelog is < register_time          # then FS crawl may be required, since history is only available @@ -1269,14 +1413,14 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          t = Thread(target=Xsyncer)          t.start()          logging.info('starting hybrid crawl..., stime: %s' -                     % repr(self.get_purge_time())) +                     % repr(self.get_data_stime()))          self.status.set_worker_crawl_status("Hybrid Crawl")          while True:              try:                  item = self.comlist.pop(0)                  if item[0] == 'finale':                      logging.info('finished hybrid crawl syncing, stime: %s' -                                 % repr(self.get_purge_time())) +                                 % repr(self.get_data_stime()))                      break                  elif item[0] == 'xsync':                      logging.info('processing xsync changelog %s' % (item[1])) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index ed0e7efe2b2..91ca1916f6a 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -31,11 +31,10 @@ import shutil  from gconf import gconf  import repce  from repce import RepceServer, RepceClient -from master import gmaster_builder  import syncdutils  from syncdutils import GsyncdError, select, privileged, boolify, funcode  from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable +from syncdutils import NoStimeAvailable, PartialHistoryAvailable  from syncdutils import ChangelogException, ChangelogHistoryNotAvailable  from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION  from gsyncdstatus import GeorepStatus @@ -522,6 +521,29 @@ class Server(object):                  raise      @classmethod +    @_pathguard +    def entry_stime(cls, path, uuid): +        """ +        entry_stime xattr to reduce the number of retry of Entry changes when +        Geo-rep worker crashes and restarts. entry_stime is updated after +        processing every changelog file. On failure and restart, worker only +        have to reprocess the last changelog for Entry ops. +        Xattr Key: <PFX>.<MASTERVOL_UUID>.<SLAVEVOL_UUID>.entry_stime +        """ +        try: +            val = Xattr.lgetxattr(path, +                                  '.'.join([cls.GX_NSPACE, uuid, +                                            'entry_stime']), +                                  8) +            return struct.unpack('!II', val) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno in (ENOENT, ENODATA, ENOTDIR): +                return ex.errno +            else: +                raise + +    @classmethod      def node_uuid(cls, path='.'):          try:              uuid_l = Xattr.lgetxattr_buf( @@ -542,6 +564,16 @@ class Server(object):      @classmethod      @_pathguard +    def set_entry_stime(cls, path, uuid, mark): +        """set @mark as stime for @uuid on @path""" +        errno_wrap(Xattr.lsetxattr, +                   [path, +                    '.'.join([cls.GX_NSPACE, uuid, 'entry_stime']), +                    struct.pack('!II', *mark)], +                   [ENOENT]) + +    @classmethod +    @_pathguard      def set_xtime(cls, path, uuid, mark):          """set @mark as xtime for @uuid on @path"""          errno_wrap(Xattr.lsetxattr, @@ -1376,6 +1408,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      def gmaster_instantiate_tuple(self, slave):          """return a tuple of the 'one shot' and the 'main crawl'          class instance""" +        from master import gmaster_builder          return (gmaster_builder('xsync')(self, slave),                  gmaster_builder()(self, slave),                  gmaster_builder('changeloghistory')(self, slave)) @@ -1436,6 +1469,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                                                uuid + '.' + gconf.slave_id)                          ),                          slave.server) +                    slave.server.entry_stime = types.MethodType( +                        lambda _self, path, uuid: ( +                            brickserver.entry_stime( +                                path, +                                uuid + '.' + gconf.slave_id) +                        ), +                        slave.server)                      slave.server.set_stime = types.MethodType(                          lambda _self, path, uuid, mark: (                              brickserver.set_stime(path, @@ -1443,6 +1483,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                                                    mark)                          ),                          slave.server) +                    slave.server.set_entry_stime = types.MethodType( +                        lambda _self, path, uuid, mark: ( +                            brickserver.set_entry_stime( +                                path, +                                uuid + '.' + gconf.slave_id, +                                mark) +                        ), +                        slave.server)                  (g1, g2, g3) = self.gmaster_instantiate_tuple(slave)                  g1.master.server = brickserver                  g2.master.server = brickserver @@ -1506,7 +1554,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              except ChangelogHistoryNotAvailable:                  logging.info('Changelog history not available, using xsync')                  g1.crawlwrap(oneshot=True, register_time=register_time) -            except NoPurgeTimeAvailable: +            except NoStimeAvailable:                  logging.info('No stime available, using xsync crawl')                  g1.crawlwrap(oneshot=True, register_time=register_time)              except ChangelogException as e: diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 40eff050a9e..f7beb947efc 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -45,7 +45,7 @@ except ImportError:  # auxiliary gfid based access prefix  _CL_AUX_GFID_PFX = ".gfid/" -GF_OP_RETRIES = 20 +GF_OP_RETRIES = 10  CHANGELOG_AGENT_SERVER_VERSION = 1.0  CHANGELOG_AGENT_CLIENT_VERSION = 1.0 @@ -494,15 +494,18 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):  def lstat(e):      return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE]) -class NoPurgeTimeAvailable(Exception): + +class NoStimeAvailable(Exception):      pass  class PartialHistoryAvailable(Exception):      pass +  class ChangelogHistoryNotAvailable(Exception):      pass +  class ChangelogException(OSError):      pass  | 
