diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 307 | 
1 files changed, 121 insertions, 186 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 3df08e41a13..7d015aee718 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -26,12 +26,6 @@ from syncdutils import Thread, GsyncdError, boolify, escape  from syncdutils import unescape, gauxpfx, md5hex, selfkill  from syncdutils import lstat, errno_wrap, FreeObject  from syncdutils import NoStimeAvailable, PartialHistoryAvailable -from changelogsdb import db_init, db_record_data, db_record_meta -from changelogsdb import db_remove_data, db_remove_meta -from changelogsdb import db_get_data, db_get_meta, db_commit -from changelogsdb import db_get_data_count, db_get_meta_count -from changelogsdb import db_delete_meta_if_exists_in_data -  URXTIME = (-1, 0) @@ -51,10 +45,6 @@ CHANGELOG_ROLLOVER_TIME = 15  # that batch since stime will get updated after each batch.  MAX_CHANGELOG_BATCH_SIZE = 727040 -# Number of record to query once -DB_PAGINATION_SIZE_META = 100 -DB_PAGINATION_SIZE_DATA = 1000 -  # Utility functions to help us to get to closer proximity  # of the DRY principle (no, don't look for elevated or  # perspectivistic things here) @@ -287,7 +277,7 @@ class TarSSHEngine(object):                      st = lstat(se)                      if isinstance(st, int):                          # file got unlinked in the interim -                        db_remove_data(se) +                        self.unlinked_gfids.add(se)                          return True              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -322,7 +312,7 @@ class RsyncEngine(object):                      st = lstat(se)                      if isinstance(st, int):                          # file got unlinked in the interim -                        db_remove_data(se) +                        self.unlinked_gfids.add(se)                          return True              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -427,6 +417,7 @@ class GMasterCommon(object):          self.volinfo = None          self.terminate = False          self.sleep_interval = 1 +        self.unlinked_gfids = set()      def init_keep_alive(cls):          """start the keep-alive thread """ @@ -666,11 +657,6 @@ class GMasterCommon(object):              ret = j[-1]()              if not ret:                  succeed = False - -        # All the unlinked GFIDs removed from Data and Meta list -        # Commit the Transaction -        db_commit() -          if succeed and not args[0] is None:              self.sendmark(path, *args)          return succeed @@ -783,12 +769,10 @@ class GMasterChangelogMixin(GMasterCommon):          pfx = gauxpfx()          clist = []          entries = [] -        change_ts = change.split(".")[-1] +        meta_gfid = set() +        datas = set() -        # self.data_batch_start is None only in beginning and during -        # new batch start -        if self.data_batch_start is None: -            self.data_batch_start = change_ts +        change_ts = change.split(".")[-1]          # Ignore entry ops which are already processed in Changelog modes          ignore_entry_ops = False @@ -860,10 +844,9 @@ class GMasterChangelogMixin(GMasterCommon):                      # not fail                      pt = os.path.join(pfx, ec[0])                      st = lstat(pt) -                    if isinstance(st, int): +                    if pt in datas and isinstance(st, int):                          # file got unlinked, May be historical Changelog -                        db_remove_data(pt) -                        db_remove_meta(pt) +                        datas.remove(pt)                      if not boolify(gconf.ignore_deletes):                          if not ignore_entry_ops: @@ -886,10 +869,10 @@ class GMasterChangelogMixin(GMasterCommon):                                  # CREATED if source not exists.                                  entries.append(edct('LINK', stat=st, entry=en,                                                 gfid=gfid)) +                                  # Here, we have the assumption that only                                  # tier-gfid.linkto causes this mknod. Add data -                                db_record_data(os.path.join(pfx, ec[0]), -                                               change_ts) +                                datas.add(os.path.join(pfx, ec[0]))                                  continue                      # stat info. present in the changelog itself @@ -931,25 +914,48 @@ class GMasterChangelogMixin(GMasterCommon):                      else:                          logging.warn('ignoring %s [op %s]' % (gfid, ty))              elif et == self.TYPE_GFID: -                db_record_data(os.path.join(pfx, ec[0]), change_ts) +                # If self.unlinked_gfids is available, then that means it is +                # retrying the changelog second time. Do not add the GFID's +                # to rsync job if failed previously but unlinked in master +                if self.unlinked_gfids and \ +                   os.path.join(pfx, ec[0]) in self.unlinked_gfids: +                    logging.debug("ignoring data, since file purged interim") +                else: +                    datas.add(os.path.join(pfx, ec[0]))              elif et == self.TYPE_META:                  if ec[1] == 'SETATTR':  # only setattr's for now... -                    db_record_meta(os.path.join(pfx, ec[0]), change_ts) +                    if len(ec) == 5: +                        # In xsync crawl, we already have stat data +                        # avoid doing stat again +                        meta_gfid.add((os.path.join(pfx, ec[0]), +                                       XCrawlMetadata(st_uid=ec[2], +                                                      st_gid=ec[3], +                                                      st_mode=ec[4], +                                                      st_atime=ec[5], +                                                      st_mtime=ec[6]))) +                    else: +                        meta_gfid.add((os.path.join(pfx, ec[0]), ))                  elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \                       ec[1] == 'FXATTROP':                      # To sync xattr/acls use rsync/tar, --xattrs and --acls                      # switch to rsync and tar                      if not boolify(gconf.use_tarssh) and \                         (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)): -                        db_record_data(os.path.join(pfx, ec[0]), change_ts) +                        datas.add(os.path.join(pfx, ec[0]))              else:                  logging.warn('got invalid changelog type: %s' % (et))          logging.debug('entries: %s' % repr(entries)) +        # Increment counters for Status +        self.status.inc_value("entry", len(entries)) +        self.files_in_batch += len(datas) +        self.status.inc_value("data", len(datas)) +          # sync namespace          if entries and not ignore_entry_ops:              # Increment counters for Status              self.status.inc_value("entry", len(entries)) +              failures = self.slave.server.entry_ops(entries)              self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')              self.status.dec_value("entry", len(entries)) @@ -969,53 +975,70 @@ class GMasterChangelogMixin(GMasterCommon):              self.skipped_entry_changelogs_last = change_ts -        # Batch data based on number of changelogs as configured as -        # gconf.max_data_changelogs_in_batch(Default is 24 hrs) -        # stime will be set after completion of these batch, so on failure -        # Geo-rep will progress day by day -        if (self.num_changelogs > gconf.max_data_changelogs_in_batch): -            # (Start Changelog TS, End Changelog TS, [Changes]) -            self.data_batches.append([self.data_batch_start, change_ts, -                                      [change]]) -            self.data_batch_start = None -            self.num_changelogs = 0 -        else: -            self.data_batches[-1][1] = change_ts -            self.data_batches[-1][2].append(change) - -    def datas_to_queue(self, start, end): -        # Paginate db entries and add it to Rsync PostBox -        offset = 0 -        total_datas = 0 -        while True: -            # Db Pagination -            datas = db_get_data(start=start, end=end, -                                limit=DB_PAGINATION_SIZE_DATA, -                                offset=offset) -            if len(datas) == 0: -                break -            offset += DB_PAGINATION_SIZE_DATA -            total_datas += len(datas) +        # sync metadata +        if meta_gfid: +            meta_entries = [] +            for go in meta_gfid: +                if len(go) > 1: +                    st = go[1] +                else: +                    st = lstat(go[0]) +                if isinstance(st, int): +                    logging.debug('file %s got purged in the interim' % go[0]) +                    continue +                meta_entries.append(edct('META', go=go[0], stat=st)) +            if meta_entries: +                self.status.inc_value("meta", len(entries)) +                failures = self.slave.server.meta_ops(meta_entries) +                self.log_failures(failures, 'go', '', 'META') +                self.status.dec_value("meta", len(entries)) + +        # sync data +        if datas:              self.a_syncdata(datas) -        return total_datas +            self.datas_in_batch.update(datas) -    def handle_data_sync(self, start, end, changes, done, total_datas): -        """ -        Wait till all rsync jobs are complete, also handle the retries -        Update data stime Once Rsync jobs are complete. -        """ -        retry = False +    def process(self, changes, done=1):          tries = 0 +        retry = False +        self.unlinked_gfids = set() +        self.files_in_batch = 0 +        self.datas_in_batch = set()          # Error log disabled till the last round          self.syncer.disable_errorlog() +        self.skipped_entry_changelogs_first = None +        self.skipped_entry_changelogs_last = None +        self.num_skipped_entry_changelogs = 0 +        self.batch_start_time = time.time()          while True: -            if retry: -                self.datas_to_queue(start, end) +            # first, fire all changelog transfers in parallel. entry and +            # metadata are performed synchronously, therefore in serial. +            # However at the end of each changelog, data is synchronized +            # with syncdata_async() - which means it is serial w.r.t +            # entries/metadata of that changelog but happens in parallel +            # with data of other changelogs. -            if retry and tries == (gconf.max_rsync_retries - 1): -                # Enable Error logging if it is last retry -                self.syncer.enable_errorlog() +            if retry: +                if tries == (int(gconf.max_rsync_retries) - 1): +                    # Enable Error logging if it is last retry +                    self.syncer.enable_errorlog() + +                # Remove Unlinked GFIDs from Queue +                for unlinked_gfid in self.unlinked_gfids: +                    if unlinked_gfid in self.datas_in_batch: +                        self.datas_in_batch.remove(unlinked_gfid) + +                # Retry only Sync. Do not retry entry ops +                if self.datas_in_batch: +                    self.a_syncdata(self.datas_in_batch) +            else: +                for change in changes: +                    logging.debug('processing change %s' % change) +                    self.process_change(change, done, retry) +                    if not retry: +                        # number of changelogs processed in the batch +                        self.turns += 1              # Now we wait for all the data transfers fired off in the above              # step to complete. Note that this is not ideal either. Ideally @@ -1038,34 +1061,38 @@ class GMasterChangelogMixin(GMasterCommon):              # @change is the last changelog (therefore max time for this batch)              if self.syncdata_wait(): +                self.unlinked_gfids = set()                  if done: -                    xtl = (int(changes[-1].split('.')[-1]) - 1, 0) +                    xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl)                      map(self.changelog_done_func, changes)                      self.archive_and_purge_changelogs(changes)                  # Reset Data counter after sync -                self.status.dec_value("data", total_datas) +                self.status.dec_value("data", self.files_in_batch) +                self.files_in_batch = 0 +                self.datas_in_batch = set()                  break              # We do not know which changelog transfer failed, retry everything.              retry = True              tries += 1 -            if tries >= gconf.max_rsync_retries: +            if tries == int(gconf.max_rsync_retries):                  logging.error('changelogs %s could not be processed '                                'completely - moving on...' %                                ' '.join(map(os.path.basename, changes)))                  # Reset data counter on failure -                self.status.dec_value("data", total_datas) +                self.status.dec_value("data", self.files_in_batch) +                self.files_in_batch = 0 +                self.datas_in_batch = set()                  if done: -                    xtl = (int(changes[-1].split('.')[-1]) - 1, 0) +                    xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl)                      map(self.changelog_done_func, changes)                      self.archive_and_purge_changelogs(changes)                  break -              # it's either entry_ops() or Rsync that failed to do it's              # job. Mostly it's entry_ops() [which currently has a problem              # of failing to create an entry but failing to return an errno] @@ -1076,8 +1103,21 @@ class GMasterChangelogMixin(GMasterCommon):              logging.warn('incomplete sync, retrying changelogs: %s' %                           ' '.join(map(os.path.basename, changes))) +            # Reset the Data counter before Retry +            self.status.dec_value("data", self.files_in_batch) +            self.files_in_batch = 0              time.sleep(0.5) +        # Log the Skipped Entry ops range if any +        if self.skipped_entry_changelogs_first is not None and \ +           self.skipped_entry_changelogs_last is not None: +            logging.info("Skipping already processed entry " +                         "ops from CHANGELOG.{0} to CHANGELOG.{1} " +                         "Num: {2}".format( +                             self.skipped_entry_changelogs_first, +                             self.skipped_entry_changelogs_last, +                             self.num_skipped_entry_changelogs)) +          # Log Current batch details          if changes:              logging.info( @@ -1091,94 +1131,6 @@ class GMasterChangelogMixin(GMasterCommon):                      repr(self.get_data_stime()),                      repr(self.get_entry_stime()))) -    def process(self, changes, done=1): -        retry = False -        first_changelog_ts = changes[0].split(".")[-1] - -        db_init(os.path.join(self.tempdir, "temp_changelogs.db")) - -        self.skipped_entry_changelogs_first = None -        self.skipped_entry_changelogs_last = None -        self.num_skipped_entry_changelogs = 0 -        self.batch_start_time = time.time() -        # (Start Changelog TS, End Changelog TS, [Changes]) -        self.data_batches = [[first_changelog_ts, first_changelog_ts, []]] -        self.data_batch_start = None -        self.num_changelogs = 0 - -        for change in changes: -            logging.debug('processing change %s' % change) -            self.process_change(change, done, retry) -            # number of changelogs processed in the batch -            self.turns += 1 - -        # Rsync/Tar will preserve permissions, so if a GFID exists -        # in data queue then it syncs meta details too. Remove -        # all meta from meta table if exists in data table -        db_delete_meta_if_exists_in_data() - -        # All the Data/Meta populated, Commit the Changes in Db -        db_commit() - -        # Log the Skipped Entry ops range if any -        if self.skipped_entry_changelogs_first is not None and \ -           self.skipped_entry_changelogs_last is not None: -            logging.info("Skipping already processed entry " -                         "ops from CHANGELOG.{0} to CHANGELOG.{1} " -                         "Num: {2}".format( -                             self.skipped_entry_changelogs_first, -                             self.skipped_entry_changelogs_last, -                             self.num_skipped_entry_changelogs)) - -        # Entry Changelogs syncing finished -        logging.info("Syncing Entries completed in {0:.4f} seconds " -                     "CHANGELOG.{1} - CHANGELOG.{2} " -                     "Num: {3}".format( -                         time.time() - self.batch_start_time, -                         changes[0].split(".")[-1], -                         changes[-1].split(".")[-1], -                         len(changes))) - -        # Update Status Data and Meta Count -        self.status.inc_value("data", db_get_data_count()) -        self.status.inc_value("meta", db_get_meta_count()) - -        for b in self.data_batches: -            # Add to data Queue, so that Rsync will start parallelly -            # while syncing Meta ops -            total_datas = self.datas_to_queue(b[0], b[1]) - -            # Sync Meta -            offset = 0 -            while True: -                # Db Pagination -                meta_gfids = db_get_meta(start=b[0], end=b[1], -                                         limit=DB_PAGINATION_SIZE_META, -                                         offset=offset) -                if len(meta_gfids) == 0: -                    break -                offset += DB_PAGINATION_SIZE_META - -                # Collect required information for GFIDs which -                # exists in Master -                meta_entries = [] -                for go in meta_gfids: -                    st = lstat(go) -                    if isinstance(st, int): -                        logging.debug('file %s got purged in the ' -                                      'interim' % go) -                        continue -                    meta_entries.append(edct('META', go=go, stat=st)) - -                if meta_entries: -                    failures = self.slave.server.meta_ops(meta_entries) -                    self.log_failures(failures, 'go', '', 'META') -                    self.status.dec_value("meta", len(meta_entries)) - -            # Sync Data, Rsync already started syncing the files -            # wait for the completion and retry if required. -            self.handle_data_sync(b[0], b[1], b[2], done, total_datas) -      def upd_entry_stime(self, stime):          self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,                                            self.uuid, @@ -1206,12 +1158,7 @@ class GMasterChangelogMixin(GMasterCommon):          remote_node_ip = node.split(":")[0]          self.status.set_slave_node(remote_node_ip) -    def changelogs_batch_process(self, changes, single_batch=False): -        if single_batch and changes: -            logging.debug('processing changes %s' % repr(changes)) -            self.process(changes) -            return - +    def changelogs_batch_process(self, changes):          changelogs_batches = []          current_size = 0          for c in changes: @@ -1245,6 +1192,7 @@ class GMasterChangelogMixin(GMasterCommon):          changes = self.changelog_agent.getchanges()          if changes:              if data_stime: +                logging.info("slave's time: %s" % repr(data_stime))                  processed = [x for x in changes                               if int(x.split('.')[-1]) < data_stime[0]]                  for pr in processed: @@ -1313,15 +1261,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          # to be processed. returns positive value as number of changelogs          # to be processed, which will be fetched using          # history_getchanges() -        num_scanned_changelogs = self.changelog_agent.history_scan() -        num_changelogs = num_scanned_changelogs -        changes = [] -        while num_scanned_changelogs > 0: +        while self.changelog_agent.history_scan() > 0:              self.crawls += 1 -            changes += self.changelog_agent.history_getchanges() +            changes = self.changelog_agent.history_getchanges()              if changes:                  if data_stime: +                    logging.info("slave's time: %s" % repr(data_stime))                      processed = [x for x in changes                                   if int(x.split('.')[-1]) < data_stime[0]]                      for pr in processed: @@ -1330,18 +1276,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):                          self.changelog_done_func(pr)                          changes.remove(pr) -            if num_changelogs > gconf.max_history_changelogs_in_batch: -                self.changelogs_batch_process(changes, single_batch=True) -                num_changelogs = 0 -                changes = [] - -            num_scanned_changelogs = self.changelog_agent.history_scan() -            num_changelogs += num_scanned_changelogs - -        # If Last batch is not processed with MAX_NUM_CHANGELOGS_IN_BATCH -        # condition above -        if changes: -            self.changelogs_batch_process(changes, single_batch=True) +            self.changelogs_batch_process(changes)          history_turn_time = int(time.time()) - self.history_crawl_start_time  | 
