diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 374 | 
1 files changed, 53 insertions, 321 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index e34def6f6ab..cd20a490397 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -15,17 +15,15 @@ import stat  import json  import logging  import fcntl -import socket  import string  import errno  import tarfile -from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN +from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN  from threading import Condition, Lock  from datetime import datetime  from gconf import gconf -from tempfile import NamedTemporaryFile  from syncdutils import Thread, GsyncdError, boolify, escape -from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import unescape, gauxpfx, md5hex, selfkill  from syncdutils import lstat, errno_wrap  from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable @@ -389,18 +387,6 @@ class GMasterCommon(object):                      raise          return default_data -    def update_crawl_data(self): -        if getattr(gconf, 'state_detail_file', None): -            try: -                same_dir = os.path.dirname(gconf.state_detail_file) -                with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: -                    json.dump(self.total_crawl_stats, tmp) -                    tmp.flush() -                    os.fsync(tmp.fileno()) -                    os.rename(tmp.name, gconf.state_detail_file) -            except (IOError, OSError): -                raise -      def __init__(self, master, slave):          self.master = master          self.slave = slave @@ -426,14 +412,12 @@ class GMasterCommon(object):          self.total_turns = int(gconf.turns)          self.crawl_start = datetime.now()          self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} -        self.total_crawl_stats = None          self.start = None          self.change_seen = None          # the actual volinfo we make use of          self.volinfo = None          self.terminate = False          self.sleep_interval = 1 -        self.checkpoint_thread = None          self.current_files_skipped_count = 0          self.skipped_gfid_list = []          self.unlinked_gfids = [] @@ -485,7 +469,6 @@ class GMasterCommon(object):          logging.debug("Got the lock")          return True -      def should_crawl(self):          if not gconf.use_meta_volume:              return gconf.glusterd_uuid in self.master.server.node_uuid() @@ -495,7 +478,6 @@ class GMasterCommon(object):              sys.exit(1)          return self.mgmt_lock() -      def register(self):          self.register() @@ -534,10 +516,8 @@ class GMasterCommon(object):              if self.volinfo['retval']:                  logging.warn("master cluster's info may not be valid %d" %                               self.volinfo['retval']) -            self.start_checkpoint_thread()          else:              raise GsyncdError("master volinfo unavailable") -        self.total_crawl_stats = self.get_initial_crawl_data()          self.lastreport['time'] = time.time()          logging.info('crawl interval: %d seconds' % self.sleep_interval) @@ -562,7 +542,7 @@ class GMasterCommon(object):                  t0 = t1              self.update_worker_remote_node()              if not crawl: -                self.update_worker_health("Passive") +                self.status.set_passive()                  # bring up _this_ brick to the cluster stime                  # which is min of cluster (but max of the replicas)                  brick_stime = self.xtime('.', self.slave) @@ -589,35 +569,14 @@ class GMasterCommon(object):                  time.sleep(5)                  continue -            self.update_worker_health("Active") + +            self.status.set_active()              self.crawl() +              if oneshot:                  return              time.sleep(self.sleep_interval) -    @classmethod -    def _checkpt_param(cls, chkpt, prm, xtimish=True): -        """use config backend to lookup a parameter belonging to -           checkpoint @chkpt""" -        cprm = gconf.configinterface.get_realtime('checkpoint_' + prm) -        if not cprm: -            return -        chkpt_mapped, val = cprm.split(':', 1) -        if unescape(chkpt_mapped) != chkpt: -            return -        if xtimish: -            val = cls.deserialize_xtime(val) -        return val - -    @classmethod -    def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): -        """use config backend to store a parameter associated -           with checkpoint @chkpt""" -        if xtimish: -            val = cls.serialize_xtime(val) -        gconf.configinterface.set( -            'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) -      @staticmethod      def humantime(*tpair):          """format xtime-like (sec, nsec) pair to human readable format""" @@ -646,116 +605,6 @@ class GMasterCommon(object):                                string.zfill(m, 2), string.zfill(s, 2))          return date -    def checkpt_service(self, chan, chkpt): -        """checkpoint service loop - -        monitor and verify checkpoint status for @chkpt, and listen -        for incoming requests for whom we serve a pretty-formatted -        status report""" -        while True: -            chkpt = gconf.configinterface.get_realtime("checkpoint") -            if not chkpt: -                gconf.configinterface.delete("checkpoint_completed") -                gconf.configinterface.delete("checkpoint_target") -                # dummy loop for the case when there is no checkpt set -                select([chan], [], []) -                conn, _ = chan.accept() -                conn.send('\0') -                conn.close() -                continue - -            checkpt_tgt = self._checkpt_param(chkpt, 'target') -            if not checkpt_tgt: -                checkpt_tgt = self.xtime('.') -                if isinstance(checkpt_tgt, int): -                    raise GsyncdError("master root directory is " -                                      "unaccessible (%s)", -                                      os.strerror(checkpt_tgt)) -                self._set_checkpt_param(chkpt, 'target', checkpt_tgt) -            logging.debug("checkpoint target %s has been determined " -                          "for checkpoint %s" % -                          (repr(checkpt_tgt), chkpt)) - -            # check if the label is 'now' -            chkpt_lbl = chkpt -            try: -                x1, x2 = chkpt.split(':') -                if x1 == 'now': -                    chkpt_lbl = "as of " + self.humantime(x2) -            except: -                pass -            completed = self._checkpt_param(chkpt, 'completed', xtimish=False) -            if completed: -                completed = tuple(int(x) for x in completed.split('.')) -            s, _, _ = select([chan], [], [], (not completed) and 5 or None) -            # either request made and we re-check to not -            # give back stale data, or we still hunting for completion -            if (self.native_xtime(checkpt_tgt) and ( -                    self.native_xtime(checkpt_tgt) < self.volmark)): -                # indexing has been reset since setting the checkpoint -                status = "is invalid" -            else: -                xtr = self.xtime('.', self.slave) -                if isinstance(xtr, int): -                    raise GsyncdError("slave root directory is " -                                      "unaccessible (%s)", -                                      os.strerror(xtr)) -                ncompleted = self.xtime_geq(xtr, checkpt_tgt) -                if completed and not ncompleted:  # stale data -                    logging.warn("completion time %s for checkpoint %s " -                                 "became stale" % -                                 (self.humantime(*completed), chkpt)) -                    completed = None -                    gconf.configinterface.delete('checkpoint_completed') -                if ncompleted and not completed:  # just reaching completion -                    completed = "%.6f" % time.time() -                    self._set_checkpt_param( -                        chkpt, 'completed', completed, xtimish=False) -                    completed = tuple(int(x) for x in completed.split('.')) -                    logging.info("checkpoint %s completed" % chkpt) -                status = completed and \ -                    "completed at " + self.humantime(completed[0]) or \ -                    "not reached yet" -            if s: -                conn = None -                try: -                    conn, _ = chan.accept() -                    try: -                        conn.send("checkpoint %s is %s\0" % -                                  (chkpt_lbl, status)) -                    except: -                        exc = sys.exc_info()[1] -                        if ((isinstance(exc, OSError) or isinstance( -                                exc, IOError)) and exc.errno == EPIPE): -                            logging.debug('checkpoint client disconnected') -                        else: -                            raise -                finally: -                    if conn: -                        conn.close() - -    def start_checkpoint_thread(self): -        """prepare and start checkpoint service""" -        if self.checkpoint_thread or not ( -            getattr(gconf, 'state_socket_unencoded', None) and getattr( -                gconf, 'socketdir', None) -        ): -            return -        chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) -        state_socket = os.path.join( -            gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") -        try: -            os.unlink(state_socket) -        except: -            if sys.exc_info()[0] == OSError: -                pass -        chan.bind(state_socket) -        chan.listen(1) -        chkpt = gconf.configinterface.get_realtime("checkpoint") -        t = Thread(target=self.checkpt_service, args=(chan, chkpt)) -        t.start() -        self.checkpoint_thread = t -      def add_job(self, path, label, job, *a, **kw):          """insert @job function to job table at @path with @label"""          if self.jobtab.get(path) is None: @@ -929,11 +778,15 @@ class GMasterChangelogMixin(GMasterCommon):              files_pending['purge'] += 1          def log_failures(failures, entry_key, gfid_prefix, log_prefix): +            num_failures = 0              for failure in failures:                  st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))                  if not isinstance(st, int): +                    num_failures += 1                      logging.warn('%s FAILED: %s' % (log_prefix, repr(failure))) +            self.status.inc_value("failures", num_failures) +          for e in clist:              e = e.strip()              et = e[self.IDX_START:self.IDX_END]   # entry type @@ -1032,12 +885,18 @@ class GMasterChangelogMixin(GMasterCommon):              else:                  logging.warn('got invalid changelog type: %s' % (et))          logging.debug('entries: %s' % repr(entries)) -        if not retry: -            self.update_worker_cumilitive_status(files_pending) + +        # Increment counters for Status +        self.status.inc_value("entry", len(entries)) +        self.files_in_batch = len(datas) +        self.status.inc_value("data", self.files_in_batch) +          # sync namespace          if entries:              failures = self.slave.server.entry_ops(entries)              log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') +            self.status.dec_value("entry", len(entries)) +          # sync metadata          if meta_gfid:              meta_entries = [] @@ -1051,8 +910,11 @@ class GMasterChangelogMixin(GMasterCommon):                      continue                  meta_entries.append(edct('META', go=go[0], stat=st))              if meta_entries: +                self.status.inc_value("meta", len(entries))                  failures = self.slave.server.meta_ops(meta_entries)                  log_failures(failures, 'go', '', 'META') +                self.status.dec_value("meta", len(entries)) +          # sync data          if datas:              self.a_syncdata(datas) @@ -1104,9 +966,17 @@ class GMasterChangelogMixin(GMasterCommon):                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) +                    chkpt_time = gconf.configinterface.get_realtime( +                        "checkpoint") +                    checkpoint_time = 0 +                    if chkpt_time is not None: +                        checkpoint_time = int(chkpt_time) + +                    self.status.set_last_synced(xtl, checkpoint_time)                      map(self.changelog_done_func, changes)                      self.archive_and_purge_changelogs(changes) -                self.update_worker_files_syncd() +                self.status.dec_value("data", self.files_in_batch) +                self.files_in_batch = 0                  break              # We do not know which changelog transfer failed, retry everything. @@ -1116,14 +986,22 @@ class GMasterChangelogMixin(GMasterCommon):                  logging.warn('changelogs %s could not be processed - '                               'moving on...' %                               ' '.join(map(os.path.basename, changes))) -                self.update_worker_total_files_skipped( -                    self.current_files_skipped_count) +                self.status.inc_value("failures", +                                      self.current_files_skipped_count)                  logging.warn('SKIPPED GFID = %s' %                               ','.join(self.skipped_gfid_list)) -                self.update_worker_files_syncd() + +                self.files_in_batch = 0                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) +                    chkpt_time = gconf.configinterface.get_realtime( +                        "checkpoint") +                    checkpoint_time = 0 +                    if chkpt_time is not None: +                        checkpoint_time = int(chkpt_time) + +                    self.status.set_last_synced(xtl, checkpoint_time)                      map(self.changelog_done_func, changes)                      self.archive_and_purge_changelogs(changes)                  break @@ -1144,164 +1022,15 @@ class GMasterChangelogMixin(GMasterCommon):          if not stime == URXTIME:              self.sendmark(path, stime) -    def get_worker_status_file(self): -        file_name = gconf.local_path + '.status' -        file_name = file_name.replace("/", "_") -        worker_status_file = gconf.georep_session_working_dir + file_name -        return worker_status_file - -    def update_worker_status(self, key, value): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data[key] = value -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    default_data[key] = value -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise - -    def update_worker_cumilitive_status(self, files_pending): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data['files_remaining'] = files_pending['count'] -                loaded_data['bytes_remaining'] = files_pending['bytes'] -                loaded_data['purges_remaining'] = files_pending['purge'] -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    default_data['files_remaining'] = files_pending['count'] -                    default_data['bytes_remaining'] = files_pending['bytes'] -                    default_data['purges_remaining'] = files_pending['purge'] -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise -      def update_worker_remote_node(self):          node = sys.argv[-1] -        node = node.split("@")[-1] +        node_data = node.split("@") +        node = node_data[-1]          remote_node_ip = node.split(":")[0] -        remote_node_vol = node.split(":")[3] -        remote_node = remote_node_ip + '::' + remote_node_vol -        self.update_worker_status('remote_node', remote_node) - -    def update_worker_health(self, state): -        self.update_worker_status('worker status', state) - -    def update_worker_crawl_status(self, state): -        self.update_worker_status('crawl status', state) - -    def update_worker_files_syncd(self): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data['files_syncd'] += loaded_data['files_remaining'] -                loaded_data['files_remaining'] = 0 -                loaded_data['bytes_remaining'] = 0 -                loaded_data['purges_remaining'] = 0 -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise - -    def update_worker_files_remaining(self, state): -        self.update_worker_status('files_remaining', state) - -    def update_worker_bytes_remaining(self, state): -        self.update_worker_status('bytes_remaining', state) - -    def update_worker_purges_remaining(self, state): -        self.update_worker_status('purges_remaining', state) - -    def update_worker_total_files_skipped(self, value): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data['total_files_skipped'] = value -                loaded_data['files_remaining'] -= value -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    default_data['total_files_skipped'] = value -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise +        self.status.set_slave_node(remote_node_ip)      def crawl(self): -        self.update_worker_crawl_status("Changelog Crawl") +        self.status.set_worker_crawl_status("Changelog Crawl")          changes = []          # get stime (from the brick) and purge changelogs          # that are _historical_ to that time. @@ -1327,16 +1056,17 @@ class GMasterChangelogMixin(GMasterCommon):                  logging.debug('processing changes %s' % repr(changes))                  self.process(changes) -    def register(self, register_time, changelog_agent): +    def register(self, register_time, changelog_agent, status):          self.changelog_agent = changelog_agent          self.sleep_interval = int(gconf.change_interval)          self.changelog_done_func = self.changelog_agent.done          self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),                                                       ".processed") +        self.status = status  class GMasterChangeloghistoryMixin(GMasterChangelogMixin): -    def register(self, register_time, changelog_agent): +    def register(self, register_time, changelog_agent, status):          self.changelog_agent = changelog_agent          self.changelog_register_time = register_time          self.history_crawl_start_time = register_time @@ -1344,10 +1074,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          self.history_turns = 0          self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),                                                       ".history/.processed") +        self.status = status      def crawl(self):          self.history_turns += 1 -        self.update_worker_crawl_status("History Crawl") +        self.status.set_worker_crawl_status("History Crawl")          purge_time = self.get_purge_time()          logging.info('starting history crawl... turns: %s, stime: %s' @@ -1429,7 +1160,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      XSYNC_MAX_ENTRIES = 1 << 13 -    def register(self, register_time=None, changelog_agent=None): +    def register(self, register_time=None, changelog_agent=None, status=None): +        self.status = status          self.counter = 0          self.comlist = []          self.stimes = [] @@ -1460,7 +1192,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          t.start()          logging.info('starting hybrid crawl..., stime: %s'                       % repr(self.get_purge_time())) -        self.update_worker_crawl_status("Hybrid Crawl") +        self.status.set_worker_crawl_status("Hybrid Crawl")          while True:              try:                  item = self.comlist.pop(0)  | 
