diff options
| author | Venky Shankar <vshankar@redhat.com> | 2013-07-31 13:01:30 +0530 | 
|---|---|---|
| committer | Anand Avati <avati@redhat.com> | 2013-09-04 19:50:09 -0700 | 
| commit | 603119c57c5055e17693d47b9965216954f45247 (patch) | |
| tree | 802d25574d5ba091e811d15f3b19b2c5bc4adb27 | |
| parent | f0b92a45e5c757ec25257e389c877b9b0de8ed07 (diff) | |
gsyncd / geo-rep: Introduce basic crawl instrumentation
This patch extends the persistent instrumentation work done by
Aravinda (@avishwa), by introducing a handfull of instrumentation
variables for crawl. These variables are "pulled up" by glusterd
in the event of a geo-replication status cli command and looks
something like below:
"Uptime=00:21:10;FilesSyned=2982;FilesPending=0;BytesPending=0;DeletesPending=0;"
"FilesPending", "BytesPending" and "DeletesPending" are short-lived
variables that are non-zero when a changelog is being processes (ie.
when an active sync in ongoing). After a successfull changelog process
"FilesPending" is summed up into "FilesSynced". The three short-lived
variabled are then reset to zero and the data is persisted
Additionally this patch also reverts some of the changes made for
BZ #986929 (those were not needed).
Change-Id: I948f1a0884ca71bc5e5bcfdc017d16c8c54fc30b
BUG: 990420
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Reviewed-on: http://review.gluster.org/5441
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 156 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 14 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 10 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog.c | 4 | 
4 files changed, 104 insertions, 80 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 12eadb107..cf2f7db07 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -17,7 +17,8 @@ from datetime import datetime  from gconf import gconf  from tempfile import mkdtemp, NamedTemporaryFile  from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ -                       unescape, select, gauxpfx, md5hex, selfkill, entry2pb +                       unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ +                       lstat  URXTIME = (-1, 0) @@ -380,7 +381,13 @@ class GMasterCommon(object):          return self.xtime_low(rsc.server, path, **opts)      def get_initial_crawl_data(self): -        default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0} +        # while persisting only 'files_syncd' is non-zero, rest of +        # the stats are nulls. lets keep it that way in case they +        # are needed to be used some day... +        default_data = {'files_syncd': 0, +                        'files_remaining': 0, +                        'bytes_remaining': 0, +                        'purges_remaining': 0}          if getattr(gconf, 'state_detail_file', None):              try:                  return json.load(open(gconf.state_detail_file)) @@ -393,7 +400,6 @@ class GMasterCommon(object):                      return default_data                  else:                      raise -          return default_data      def update_crawl_data(self): @@ -422,10 +428,9 @@ class GMasterCommon(object):          self.crawls = 0          self.turns = 0          self.total_turns = int(gconf.turns) +        self.crawl_start = datetime.now()          self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} -        self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0, -                            'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0} -        self.total_crawl_stats = self.get_initial_crawl_data() +        self.total_crawl_stats = None          self.start = None          self.change_seen = None          # the authoritative (foreign, native) volinfo pair @@ -491,9 +496,8 @@ class GMasterCommon(object):              # for a passive gsyncd (ie. in a replicate scenario)              # the keepalive thread would keep the connection alive.              self.init_keep_alive() +	self.total_crawl_stats = self.get_initial_crawl_data()          self.lastreport['time'] = time.time() -        self.crawl_stats['crawl_starttime'] = datetime.now() -          logging.info('crawl interval: %d seconds' % self.sleep_interval)          t0 = time.time()          crawl = self.should_crawl() @@ -556,17 +560,13 @@ class GMasterCommon(object):          return ts      def get_extra_info(self): -        str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced']) -        str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced']) - -        self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime'] - -        str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time'])) -        str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time']) -        str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time']) -        str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced']) -        str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced']) -        str_info += "\0" +        str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \ +            (self._crawl_time_format(datetime.now() - self.crawl_start), \ +                 self.total_crawl_stats['files_syncd'], \ +                 self.total_crawl_stats['files_remaining'], \ +                 self.total_crawl_stats['bytes_remaining'], \ +                 self.total_crawl_stats['purges_remaining']) +        str_info += '\0'          logging.debug(str_info)          return str_info @@ -788,15 +788,11 @@ class GMasterChangelogMixin(GMasterCommon):          logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))          return (workdir, logfile) -    def lstat(self, e): -        try: -            return os.lstat(e) -        except (IOError, OSError): -            ex = sys.exc_info()[1] -            if ex.errno == ENOENT: -                return ex.errno -            else: -                raise +    # update stats from *this* crawl +    def update_cumulative_stats(self, files_pending): +        self.total_crawl_stats['files_remaining']  = files_pending['count'] +        self.total_crawl_stats['bytes_remaining']  = files_pending['bytes'] +        self.total_crawl_stats['purges_remaining'] = files_pending['purge']      # sync data      def syncdata(self, datas): @@ -804,43 +800,31 @@ class GMasterChangelogMixin(GMasterCommon):          for data in datas:              logging.debug('candidate for syncing %s' % data)              pb = self.syncer.add(data) -            timeA = datetime.now()              def regjob(se, xte, pb):                  rv = pb.wait()                  if rv[0]:                      logging.debug('synced ' + se) -                    # update stats -                    timeB = datetime.now() -                    self.crawl_stats['last_synctime'] = timeB - timeA -                    self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) -                    self.crawl_stats['files_synced'] += 1 -                    self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced - -                    # cumulative statistics -                    self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced -                    self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) -                    self.total_crawl_stats['files_synced'] += 1                      return True                  else:                      if rv[1] in [23, 24]:                          # stat to check if the file exist -                        st = self.lstat(se) +                        st = lstat(se)                          if isinstance(st, int):                              # file got unlinked in the interim                              return True                      logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb)          if self.wait(self.FLAT_DIR_HIERARCHY, None): -            self.update_crawl_data()              return True -    def process_change(self, change, done): +    def process_change(self, change, done, retry): +        pfx = gauxpfx()          clist   = []          entries = [] -        purges = set() -        links = set()          datas = set() -        pfx = gauxpfx() + +        # basic crawl stats: files and bytes +        files_pending  = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}          try:              f = open(change, "r")              clist = f.readlines() @@ -861,6 +845,27 @@ class GMasterChangelogMixin(GMasterCommon):                  else:                      dct[k] = ed[k]              return dct + +        # regular file update: bytes & count +        def _update_reg(entry, size): +            if not entry in files_pending['files']: +                files_pending['count'] += 1 +                files_pending['bytes'] += size +                files_pending['files'].append(entry) +        # updates for directories, symlinks etc.. +        def _update_rest(): +            files_pending['count'] += 1 + +        # entry count +        def entry_update(entry, size, mode): +            if stat.S_ISREG(mode): +                _update_reg(entry, size) +            else: +                _update_rest() +        # purge count +        def purge_update(): +            files_pending['purge'] += 1 +          for e in clist:              e = e.strip()              et = e[self.IDX_START:self.IDX_END] @@ -871,20 +876,19 @@ class GMasterChangelogMixin(GMasterCommon):                  gfid = ec[self.POS_GFID]                  # definitely need a better way bucketize entry ops                  if ty in ['UNLINK', 'RMDIR']: -                  entries.append(edct(ty, gfid=gfid, entry=en)) -                  purges.update([os.path.join(pfx, gfid)]) -                  continue -                if not ty == 'RENAME': -                    go = os.path.join(pfx, gfid) -                    st = self.lstat(go) -                    if isinstance(st, int): -                        logging.debug('file %s got purged in the interim' % go) -                        continue +                    purge_update() +                    entries.append(edct(ty, gfid=gfid, entry=en)) +                    continue +                go = os.path.join(pfx, gfid) +                st = lstat(go) +                if isinstance(st, int): +                    logging.debug('file %s got purged in the interim' % go) +                    continue +                entry_update(go, st.st_size, st.st_mode)                  if ty in ['CREATE', 'MKDIR', 'MKNOD']:                      entries.append(edct(ty, stat=st, entry=en, gfid=gfid))                  elif ty == 'LINK':                      entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) -                    links.update([os.path.join(pfx, gfid)])                  elif ty == 'SYMLINK':                      entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en)))                  elif ty == 'RENAME': @@ -893,30 +897,41 @@ class GMasterChangelogMixin(GMasterCommon):                  else:                      pass              elif et in self.TYPE_GFID: -                da = os.path.join(pfx, ec[0]) -                st = self.lstat(da) +                go = os.path.join(pfx, ec[0]) +                st = lstat(go)                  if isinstance(st, int): -                    logging.debug('file %s got purged in the interim' % da) +                    logging.debug('file %s got purged in the interim' % go)                      continue -                datas.update([da]) +                entry_update(go, st.st_size, st.st_mode) +                datas.update([go])          logging.debug('entries: %s' % repr(entries)) +        if not retry: +            self.update_cumulative_stats(files_pending)          # sync namespace          if (entries):              self.slave.server.entry_ops(entries)          # sync data -        if self.syncdata(datas - (purges - links)): +        if self.syncdata(datas):              if done:                  self.master.server.changelog_done(change)              return True +    def sync_done(self): +        self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining'] +        self.total_crawl_stats['files_remaining']  = 0 +        self.total_crawl_stats['bytes_remaining']  = 0 +        self.total_crawl_stats['purges_remaining'] = 0 +        self.update_crawl_data() +      def process(self, changes, done=1):          for change in changes: -            times = 0 +            retry = False              while True: -                times += 1 -                logging.debug('processing change %s [%d time(s)]' % (change, times)) -                if self.process_change(change, done): +                logging.debug('processing change %s' % change) +                if self.process_change(change, done, retry): +                    self.sync_done()                      break +                retry = True                  # it's either entry_ops() or Rsync that failed to do it's                  # job. Mostly it's entry_ops() [which currently has a problem                  # of failing to create an entry but failing to return an errno] @@ -1032,7 +1047,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          for e in dem:              bname = e              e = os.path.join(path, e) -            st = self.lstat(e) +            st = lstat(e)              if isinstance(st, int):                  logging.warn('%s got purged in the interim..' % e)                  continue @@ -1191,19 +1206,10 @@ class GMasterXtimeMixin(GMasterCommon):              elif stat.S_ISREG(mo):                  logging.debug("syncing %s ..." % e)                  pb = self.syncer.add(e) -                timeA = datetime.now()                  def regjob(e, xte, pb):                      if pb.wait()[0]:                          logging.debug("synced " + e)                          self.sendmark_regular(e, xte) -                        # update stats -                        timeB = datetime.now() -                        self.crawl_stats['last_synctime'] = timeB - timeA -                        self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) -                        self.crawl_stats['files_synced'] += 1 -                        self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) -                        self.total_crawl_stats['files_synced'] += 1 -                        self.update_crawl_data()                          return True                      else:                          logging.warn("failed to sync " + e) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 1010247ae..2357b4f91 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -21,7 +21,7 @@ from repce import RepceServer, RepceClient  from  master import gmaster_builder  import syncdutils  from syncdutils import GsyncdError, select, privileged, boolify, funcode -from syncdutils import umask, entry2pb, gauxpfx, errno_wrap +from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat  UrlRX  = re.compile('\A(\w+)://([^ *?[]*)\Z')  HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -514,12 +514,20 @@ class Server(object):              elif op == 'MKDIR':                  blob = entry_pack_mkdir(gfid, bname, e['stat'])              elif op == 'LINK': -                errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST]) +                st = lstat(entry) +                if isinstance(st, int): +                    blob = entry_pack_reg(gfid, bname, e['stat']) +                else: +                    errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST])              elif op == 'SYMLINK':                  blob = entry_pack_symlink(gfid, bname, e['link'], e['stat'])              elif op == 'RENAME':                  en = e['entry1'] -                errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) +                st = lstat(entry) +                if isinstance(st, int): +                    blob = entry_pack_reg(gfid, bname, e['stat']) +                else: +                    errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])              if blob:                  errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [ENOENT, EEXIST]) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index c09b2505d..2655dd983 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -419,3 +419,13 @@ def errno_wrap(call, arg=[], errnos=[]):              if not ex.errno == ESTALE:                  raise              time.sleep(0.5)  # retry the call + +def lstat(e): +    try: +        return os.lstat(e) +    except (IOError, OSError): +        ex = sys.exc_info()[1] +        if ex.errno == ENOENT: +            return ex.errno +        else: +            raise diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index fe0643429..0e85ee7a0 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -203,7 +203,6 @@ changelog_rename (call_frame_t *frame, xlator_t *this,                    loc_t *oldloc, loc_t *newloc, dict_t *xdata)  {          size_t            xtra_len  = 0; -        uuid_t            null_uuid = {0,};          changelog_priv_t *priv      = NULL;          changelog_opt_t  *co        = NULL; @@ -211,7 +210,8 @@ changelog_rename (call_frame_t *frame, xlator_t *this,          CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);          /* 3 == fop + oldloc + newloc */ -        CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, null_uuid, 3); +        CHANGELOG_INIT_NOCHECK (this, frame->local, +                                NULL, oldloc->inode->gfid, 3);          co = changelog_get_usable_buffer (frame->local);          if (!co)  | 
