diff options
Diffstat (limited to 'geo-replication')
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 48 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 22 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 241 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 61 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/repce.py | 8 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 92 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 27 | 
7 files changed, 289 insertions, 210 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 3ddcb7f5454..932e37d1124 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -39,7 +39,7 @@ from changelogagent import agent, Changelog  from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc  from libcxattr import Xattr  import struct -from syncdutils import get_master_and_slave_data_from_args +from syncdutils import get_master_and_slave_data_from_args, lf  ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -127,24 +127,30 @@ def slave_vol_uuid_get(host, vol):                            stdin=None, stdout=PIPE, stderr=PIPE)      vix, err = po.communicate()      if po.returncode != 0: -        logging.info("Volume info failed, unable to get " -                     "volume uuid of %s present in %s," -                     "returning empty string: %s" % -                     (vol, host, po.returncode)) +        logging.info(lf("Volume info failed, unable to get " +                        "volume uuid of slavevol, " +                        "returning empty string", +                        slavevol=vol, +                        slavehost=host, +                        error=po.returncode))          return ""      vi = XET.fromstring(vix)      if vi.find('opRet').text != '0': -        logging.info("Unable to get volume uuid of %s, " -                     "present in %s returning empty string: %s" % -                     (vol, host, vi.find('opErrstr').text)) +        logging.info(lf("Unable to get volume uuid of slavevol, " +                        "returning empty string", +                        slavevol=vol, +                        slavehost=host, +                        error=vi.find('opErrstr').text))          return ""      try:          voluuid = vi.find("volInfo/volumes/volume/id").text      except (ParseError, AttributeError, ValueError) as e: -        logging.info("Parsing failed to volume uuid of %s, " -                     "present in %s returning empty string: %s" % -                     (vol, host, e)) +        logging.info(lf("Parsing failed to volume uuid of slavevol, " +                        "returning empty string", +                        slavevol=vol, +                        slavehost=host, +                        error=e))          voluuid = ""      return voluuid @@ -692,16 +698,18 @@ def main_i():              if confdata.op == 'set':                  if confdata.opt == 'checkpoint': -                    logging.info("Checkpoint Set: %s" % ( -                        human_time_utc(confdata.val))) +                    logging.info(lf("Checkpoint Set", +                                    time=human_time_utc(confdata.val)))                  else: -                    logging.info("Config Set: %s = %s" % ( -                        confdata.opt, confdata.val)) +                    logging.info(lf("Config Set", +                                    config=confdata.opt, +                                    value=confdata.val))              elif confdata.op == 'del':                  if confdata.opt == 'checkpoint':                      logging.info("Checkpoint Reset")                  else: -                    logging.info("Config Reset: %s" % confdata.opt) +                    logging.info(lf("Config Reset", +                                    config=confdata.opt))          except IOError:              if sys.exc_info()[1].errno == ENOENT:                  # directory of log path is not present, @@ -722,7 +730,8 @@ def main_i():          try:              GLogger._gsyncd_loginit(log_file=gconf.log_file, label='monitor')              gconf.log_exit = False -            logging.info("Monitor Status: %s" % create) +            logging.info(lf("Monitor Status Change", +                            status=create))          except IOError:              if sys.exc_info()[1].errno == ENOENT:                  # If log dir not present @@ -772,7 +781,8 @@ def main_i():      if be_agent:          os.setsid() -        logging.debug('rpc_fd: %s' % repr(gconf.rpc_fd)) +        logging.debug(lf("RPC FD", +                         rpc_fd=repr(gconf.rpc_fd)))          return agent(Changelog(), gconf.rpc_fd)      if be_monitor: @@ -786,7 +796,7 @@ def main_i():              remote.connect_remote(go_daemon='done')      local.connect()      if ffd: -        logging.info ("Closing feedback fd, waking up the monitor") +        logging.info("Closing feedback fd, waking up the monitor")          os.close(ffd)      local.service_loop(*[r for r in [remote] if r]) diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index dd363baf181..38ca92c73a9 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -20,7 +20,7 @@ from errno import EACCES, EAGAIN, ENOENT  import logging  from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event -from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED +from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED, lf  DEFAULT_STATUS = "N/A"  MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") @@ -225,10 +225,10 @@ class GeorepStatus(object):                      data["checkpoint_time"] = checkpoint_time                      data["checkpoint_completion_time"] = curr_time                      data["checkpoint_completed"] = "Yes" -                    logging.info("Checkpoint completed. Checkpoint " -                                 "Time: %s, Completion Time: %s" % ( -                                     human_time_utc(checkpoint_time), -                                     human_time_utc(curr_time))) +                    logging.info(lf("Checkpoint completed", +                                    checkpoint_time=human_time_utc( +                                        checkpoint_time), +                                    completion_time=human_time_utc(curr_time)))                      self.trigger_gf_event_checkpoint_completion(                          checkpoint_time, curr_time) @@ -238,11 +238,13 @@ class GeorepStatus(object):      def set_worker_status(self, status):          if self.set_field("worker_status", status): -            logging.info("Worker Status: %s" % status) +            logging.info(lf("Worker Status Change", +                            status=status))      def set_worker_crawl_status(self, status):          if self.set_field("crawl_status", status): -            logging.info("Crawl Status: %s" % status) +            logging.info(lf("Crawl Status Change", +                            status=status))      def set_slave_node(self, slave_node):          def merger(data): @@ -269,12 +271,14 @@ class GeorepStatus(object):      def set_active(self):          if self.set_field("worker_status", "Active"): -            logging.info("Worker Status: Active") +            logging.info(lf("Worker Status Change", +                            status="Active"))              self.send_event(EVENT_GEOREP_ACTIVE)      def set_passive(self):          if self.set_field("worker_status", "Passive"): -            logging.info("Worker Status: Passive") +            logging.info(lf("Worker Status Change", +                            status="Passive"))              self.send_event(EVENT_GEOREP_PASSIVE)      def get_monitor_status(self): diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index aebaf31dcff..17ec550aafa 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -24,7 +24,7 @@ from datetime import datetime  from gconf import gconf  from syncdutils import Thread, GsyncdError, boolify, escape  from syncdutils import unescape, gauxpfx, md5hex, selfkill -from syncdutils import lstat, errno_wrap, FreeObject +from syncdutils import lstat, errno_wrap, FreeObject, lf  from syncdutils import NoStimeAvailable, PartialHistoryAvailable  URXTIME = (-1, 0) @@ -54,8 +54,8 @@ def _volinfo_hook_relax_foreign(self):      fgn_vi = volinfo_sys[self.KFGN]      if fgn_vi:          expiry = fgn_vi['timeout'] - int(time.time()) + 1 -        logging.info('foreign volume info found, waiting %d sec for expiry' % -                     expiry) +        logging.info(lf('foreign volume info found, waiting for expiry', +                        expiry=expiry))          time.sleep(expiry)          volinfo_sys = self.get_sys_volinfo()      return volinfo_sys @@ -90,7 +90,8 @@ def gmaster_builder(excrawl=None):          modemixin = 'normal'      changemixin = 'xsync' if gconf.change_detector == 'xsync' \                    else excrawl or gconf.change_detector -    logging.debug('setting up %s change detection mode' % changemixin) +    logging.debug(lf('setting up change detection mode', +                     mode=changemixin))      modemixin = getattr(this, modemixin.capitalize() + 'Mixin')      crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')      sendmarkmixin = boolify( @@ -256,7 +257,7 @@ class TarSSHEngine(object):      """      def a_syncdata(self, files): -        logging.debug('files: %s' % (files)) +        logging.debug(lf("Files", files=files))          for f in files:              pb = self.syncer.add(f) @@ -264,7 +265,7 @@ class TarSSHEngine(object):              def regjob(se, xte, pb):                  rv = pb.wait()                  if rv[0]: -                    logging.debug('synced ' + se) +                    logging.debug(lf('synced', file=se))                      return True                  else:                      # stat check for file presence @@ -290,16 +291,16 @@ class RsyncEngine(object):      """Sync engine that uses rsync(1) for data transfers"""      def a_syncdata(self, files): -        logging.debug('files: %s' % (files)) +        logging.debug(lf("files", files=files))          for f in files: -            logging.debug('candidate for syncing %s' % f) +            logging.debug(lf('candidate for syncing', file=f))              pb = self.syncer.add(f)              def regjob(se, xte, pb):                  rv = pb.wait()                  if rv[0]: -                    logging.debug('synced ' + se) +                    logging.debug(lf('synced', file=se))                      return True                  else:                      # stat to check if the file exist @@ -431,16 +432,16 @@ class GMasterCommon(object):                  fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)                  if not gconf.active_earlier:                      gconf.active_earlier = True -                    logging.info("Got lock : %s : Becoming ACTIVE" -                                 % gconf.local_path) +                    logging.info(lf("Got lock Becoming ACTIVE", +                                    brick=gconf.local_path))                  return True              except:                  ex = sys.exc_info()[1]                  if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):                      if not gconf.passive_earlier:                          gconf.passive_earlier = True -                        logging.info("Didn't get lock : %s : Becoming PASSIVE" -                                     % gconf.local_path) +                        logging.info(lf("Didn't get lock Becoming PASSIVE", +                                        brick=gconf.local_path))                      return False                  raise @@ -449,7 +450,7 @@ class GMasterCommon(object):              + str(gconf.subvol_num) + ".lock"          mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep")          path = os.path.join(mgmt_lock_dir, bname) -        logging.debug("lock_file_path: %s" % path) +        logging.debug(lf("lock file path", path=path))          try:              fd = os.open(path, os.O_CREAT | os.O_RDWR)          except OSError: @@ -477,15 +478,16 @@ class GMasterCommon(object):                  # cannot grab, it's taken                  if not gconf.passive_earlier:                      gconf.passive_earlier = True -                    logging.info("Didn't get lock : %s : Becoming PASSIVE" -                                 % gconf.local_path) +                    logging.info(lf("Didn't get lock Becoming PASSIVE", +                                    brick=gconf.local_path))                  gconf.mgmt_lock_fd = fd                  return False              raise          if not gconf.active_earlier:              gconf.active_earlier = True -            logging.info("Got lock : %s : Becoming ACTIVE" % gconf.local_path) +            logging.info(lf("Got lock Becoming ACTIVE", +                            brick=gconf.local_path))          return True      def should_crawl(self): @@ -533,8 +535,8 @@ class GMasterCommon(object):          gconf.configinterface.set('volume_id', self.uuid)          if self.volinfo:              if self.volinfo['retval']: -                logging.warn("master cluster's info may not be valid %d" % -                             self.volinfo['retval']) +                logging.warn(lf("master cluster's info may not be valid", +                                error=self.volinfo['retval']))          else:              raise GsyncdError("master volinfo unavailable")          self.lastreport['time'] = time.time() @@ -566,8 +568,10 @@ class GMasterCommon(object):                  brick_stime = self.xtime('.', self.slave)                  cluster_stime = self.master.server.aggregated.stime_mnt(                      '.', '.'.join([str(self.uuid), str(gconf.slave_id)])) -                logging.debug("Cluster stime: %s | Brick stime: %s" % -                              (repr(cluster_stime), repr(brick_stime))) +                logging.debug(lf("Crawl info", +                                 cluster_stime=cluster_stime, +                                 brick_stime=brick_stime)) +                  if not isinstance(cluster_stime, int):                      if brick_stime < cluster_stime:                          self.slave.server.set_stime( @@ -773,8 +777,8 @@ class GMasterChangelogMixin(GMasterCommon):              st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))              if not isinstance(st, int):                  num_failures += 1 -                logging.error('%s FAILED: %s' % (log_prefix, -                                                 repr(failure))) +                logging.error(lf('%s FAILED' % log_prefix, +                                 data=failure))                  if failure[0]['op'] == 'MKDIR':                      raise GsyncdError("The above directory failed to sync."                                        " Please fix it to proceed further.") @@ -826,8 +830,8 @@ class GMasterChangelogMixin(GMasterCommon):              if self.name == 'live_changelog' or \                 self.name == 'history_changelog':                  if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY: -                    logging.debug('skip ENTRY op: %s if hot tier brick' -                                  % (ec[self.POS_TYPE])) +                    logging.debug(lf('skip ENTRY op if hot tier brick', +                                     op=ec[self.POS_TYPE]))                      continue              # Data and Meta operations are decided while parsing @@ -917,7 +921,8 @@ class GMasterChangelogMixin(GMasterCommon):                      go = os.path.join(pfx, gfid)                      st = lstat(go)                      if isinstance(st, int): -                        logging.debug('file %s got purged in the interim' % go) +                        logging.debug(lf('file got purged in the interim', +                                         file=go))                          continue                      if ty == 'LINK': @@ -930,7 +935,9 @@ class GMasterChangelogMixin(GMasterCommon):                          entries.append(                              edct(ty, stat=st, entry=en, gfid=gfid, link=rl))                      else: -                        logging.warn('ignoring %s [op %s]' % (gfid, ty)) +                        logging.warn(lf('ignoring op', +                                        gfid=gfid, +                                        type=ty))              elif et == self.TYPE_GFID:                  # If self.unlinked_gfids is available, then that means it is                  # retrying the changelog second time. Do not add the GFID's @@ -962,7 +969,8 @@ class GMasterChangelogMixin(GMasterCommon):                         (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):                          datas.add(os.path.join(pfx, ec[0]))              else: -                logging.warn('got invalid changelog type: %s' % (et)) +                logging.warn(lf('got invalid fop type', +                                type=et))          logging.debug('entries: %s' % repr(entries))          # Increment counters for Status @@ -1011,7 +1019,8 @@ class GMasterChangelogMixin(GMasterCommon):                  else:                      st = lstat(go[0])                  if isinstance(st, int): -                    logging.debug('file %s got purged in the interim' % go[0]) +                    logging.debug(lf('file got purged in the interim', +                                     file=go[0]))                      continue                  meta_entries.append(edct('META', go=go[0], stat=st))              if meta_entries: @@ -1067,7 +1076,8 @@ class GMasterChangelogMixin(GMasterCommon):                      self.a_syncdata(self.datas_in_batch)              else:                  for change in changes: -                    logging.debug('processing change %s' % change) +                    logging.debug(lf('processing change', +                                     changelog=change))                      self.process_change(change, done, retry)                      if not retry:                          # number of changelogs processed in the batch @@ -1111,9 +1121,9 @@ class GMasterChangelogMixin(GMasterCommon):              retry = True              tries += 1              if tries == int(gconf.max_rsync_retries): -                logging.error('changelogs %s could not be processed ' -                              'completely - moving on...' % -                              ' '.join(map(os.path.basename, changes))) +                logging.error(lf('changelogs could not be processed ' +                                 'completely - moving on...', +                                 files=map(os.path.basename, changes)))                  # Reset data counter on failure                  self.status.dec_value("data", self.files_in_batch) @@ -1133,8 +1143,8 @@ class GMasterChangelogMixin(GMasterCommon):              # entry_ops() that failed... so we retry the _whole_ changelog              # again.              # TODO: remove entry retries when it's gets fixed. -            logging.warn('incomplete sync, retrying changelogs: %s' % -                         ' '.join(map(os.path.basename, changes))) +            logging.warn(lf('incomplete sync, retrying changelogs', +                            files=map(os.path.basename, changes)))              # Reset the Data counter before Retry              self.status.dec_value("data", self.files_in_batch) @@ -1145,43 +1155,44 @@ class GMasterChangelogMixin(GMasterCommon):          # Log the Skipped Entry ops range if any          if self.skipped_entry_changelogs_first is not None and \             self.skipped_entry_changelogs_last is not None: -            logging.info("Skipping already processed entry " -                         "ops from CHANGELOG.{0} to CHANGELOG.{1} " -                         "Num: {2}".format( -                             self.skipped_entry_changelogs_first, -                             self.skipped_entry_changelogs_last, -                             self.num_skipped_entry_changelogs)) +            logging.info(lf("Skipping already processed entry ops", +                            from_changelog=self.skipped_entry_changelogs_first, +                            to_changelog=self.skipped_entry_changelogs_last, +                            num_changelogs=self.num_skipped_entry_changelogs))          # Log Current batch details          if changes:              logging.info( -                "Entry Time Taken (UNL:{0} RMD:{1} CRE:{2} MKN:{3} " -                "MKD:{4} REN:{5} LIN:{6} SYM:{7}): {8:.4f} " -                 "secs ".format ( -                    self.batch_stats["UNLINK"], self.batch_stats["RMDIR"], -                    self.batch_stats["CREATE"], self.batch_stats["MKNOD"], -                    self.batch_stats["MKDIR"], self.batch_stats["RENAME"], -                    self.batch_stats["LINK"], self.batch_stats["SYMLINK"], -                    self.batch_stats["ENTRY_SYNC_TIME"])) +                lf("Entry Time Taken", +                   UNL=self.batch_stats["UNLINK"], +                   RMD=self.batch_stats["RMDIR"], +                   CRE=self.batch_stats["CREATE"], +                   MKN=self.batch_stats["MKNOD"], +                   MKD=self.batch_stats["MKDIR"], +                   REN=self.batch_stats["RENAME"], +                   LIN=self.batch_stats["LINK"], +                   SYM=self.batch_stats["SYMLINK"], +                   duration="%.4f" % self.batch_stats["ENTRY_SYNC_TIME"])) +              logging.info( -                "Metadata Time Taken (SETA:{0}): {1:.4f} secs. " -                "Data Time Taken (SETX:{2} XATT:{3} DATA:{4}): " -                "{5:.4f} secs".format( -                    self.batch_stats["SETATTR"], -                    self.batch_stats["META_SYNC_TIME"], -                    self.batch_stats["SETXATTR"], self.batch_stats["XATTROP"], -                    self.batch_stats["DATA"], -                    time.time() - self.batch_stats["DATA_START_TIME"])) +                lf("Data/Metadata Time Taken", +                   SETA=self.batch_stats["SETATTR"], +                   meta_duration="%.4f" % self.batch_stats["META_SYNC_TIME"], +                   SETX=self.batch_stats["SETXATTR"], +                   XATT=self.batch_stats["XATTROP"], +                   DATA=self.batch_stats["DATA"], +                   data_duration="%.4f" % ( +                       time.time() - self.batch_stats["DATA_START_TIME"]))) +              logging.info( -                "{0} mode completed in {1:.4f} seconds " -                "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format( -                    self.name, -                    time.time() - self.batch_start_time, -                    changes[0].split("/")[-1], -                    changes[-1].split("/")[-1], -                    len(changes), -                    repr(self.get_data_stime()), -                    repr(self.get_entry_stime()))) +                lf("Batch Completed", +                   mode=self.name, +                   duration="%.4f" % (time.time() - self.batch_start_time), +                   changelog_start=changes[0].split(".")[-1], +                   changelog_end=changes[-1].split(".")[-1], +                   num_changelogs=len(changes), +                   stime=self.get_data_stime(), +                   entry_stime=self.get_entry_stime()))      def upd_entry_stime(self, stime):          self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY, @@ -1231,7 +1242,8 @@ class GMasterChangelogMixin(GMasterCommon):                      changelogs_batches[-1].append(c)          for batch in changelogs_batches: -            logging.debug('processing changes %s' % repr(batch)) +            logging.debug(lf('processing changes', +                             batch=batch))              self.process(batch)      def crawl(self): @@ -1246,13 +1258,14 @@ class GMasterChangelogMixin(GMasterCommon):          changes = self.changelog_agent.getchanges()          if changes:              if data_stime: -                logging.info("slave's time: %s" % repr(data_stime)) +                logging.info(lf("slave's time", +                                stime=data_stime))                  processed = [x for x in changes                               if int(x.split('.')[-1]) < data_stime[0]]                  for pr in processed: -                    logging.info( -                        'skipping already processed change: %s...' % -                        os.path.basename(pr)) +                    logging.debug( +                        lf('skipping already processed change', +                           changelog=os.path.basename(pr)))                      self.changelog_done_func(pr)                      changes.remove(pr)                  self.archive_and_purge_changelogs(processed) @@ -1289,10 +1302,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          data_stime = self.get_data_stime()          end_time = int(time.time()) -        logging.info('starting history crawl... turns: %s, stime: %s, ' -                     'etime: %s, entry_stime: %s' -                     % (self.history_turns, repr(data_stime), -                        repr(end_time), self.get_entry_stime())) +        logging.info(lf('starting history crawl', +                        turns=self.history_turns, +                        stime=data_stime, +                        etime=end_time, +                        entry_stime=self.get_entry_stime()))          if not data_stime or data_stime == URXTIME:              raise NoStimeAvailable() @@ -1320,12 +1334,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):              changes = self.changelog_agent.history_getchanges()              if changes:                  if data_stime: -                    logging.info("slave's time: %s" % repr(data_stime)) +                    logging.info(lf("slave's time", +                                    stime=data_stime))                      processed = [x for x in changes                                   if int(x.split('.')[-1]) < data_stime[0]]                      for pr in processed: -                        logging.info('skipping already processed change: ' -                                     '%s...' % os.path.basename(pr)) +                        logging.debug(lf('skipping already processed change', +                                         changelog=os.path.basename(pr)))                          self.changelog_done_func(pr)                          changes.remove(pr) @@ -1333,10 +1348,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          history_turn_time = int(time.time()) - self.history_crawl_start_time -        logging.info('finished history crawl syncing, endtime: %s, ' -                     'stime: %s, entry_stime: %s' -                     % (actual_end, repr(self.get_data_stime()), -                        self.get_entry_stime())) +        logging.info(lf('finished history crawl', +                        endtime=actual_end, +                        stime=self.get_data_stime(), +                        entry_stime=self.get_entry_stime()))          # If TS returned from history_changelog is < register_time          # then FS crawl may be required, since history is only available @@ -1376,7 +1391,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          self.stimes = []          self.sleep_interval = 60          self.tempdir = self.setup_working_dir() -        logging.info('Working dir: %s' % self.tempdir) +        logging.info(lf('Working dir', +                        path=self.tempdir))          self.tempdir = os.path.join(self.tempdir, 'xsync')          self.processed_changelogs_dir = self.tempdir          self.name = "xsync" @@ -1400,25 +1416,28 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              self.Xcrawl()          t = Thread(target=Xsyncer)          t.start() -        logging.info('starting hybrid crawl..., stime: %s' -                     % repr(self.get_data_stime())) +        logging.info(lf('starting hybrid crawl', +                        stime=self.get_data_stime()))          self.status.set_worker_crawl_status("Hybrid Crawl")          while True:              try:                  item = self.comlist.pop(0)                  if item[0] == 'finale': -                    logging.info('finished hybrid crawl syncing, stime: %s' -                                 % repr(self.get_data_stime())) +                    logging.info(lf('finished hybrid crawl', +                                    stime=self.get_data_stime()))                      break                  elif item[0] == 'xsync': -                    logging.info('processing xsync changelog %s' % (item[1])) +                    logging.info(lf('processing xsync changelog', +                                    path=item[1]))                      self.process([item[1]], 0)                      self.archive_and_purge_changelogs([item[1]])                  elif item[0] == 'stime': -                    logging.debug('setting slave time: %s' % repr(item[1])) +                    logging.debug(lf('setting slave time', +                                     time=item[1]))                      self.upd_stime(item[1][1], item[1][0])                  else: -                    logging.warn('unknown tuple in comlist (%s)' % repr(item)) +                    logging.warn(lf('unknown tuple in comlist', +                                    entry=item))              except IndexError:                  time.sleep(1) @@ -1496,8 +1515,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              xtr_root = self.xtime('.', self.slave)              if isinstance(xtr_root, int):                  if xtr_root != ENOENT: -                    logging.warn("slave cluster not returning the " -                                 "correct xtime for root (%d)" % xtr_root) +                    logging.warn(lf("slave cluster not returning the " +                                    "correct xtime for root", +                                    xtime=xtr_root))                  xtr_root = self.minus_infinity          xtl = self.xtime(path)          if isinstance(xtl, int): @@ -1505,8 +1525,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          xtr = self.xtime(path, self.slave)          if isinstance(xtr, int):              if xtr != ENOENT: -                logging.warn("slave cluster not returning the " -                             "correct xtime for %s (%d)" % (path, xtr)) +                logging.warn(lf("slave cluster not returning the " +                                "correct xtime", +                                path=path, +                                xtime=xtr))              xtr = self.minus_infinity          xtr = max(xtr, xtr_root)          zero_zero = (0, 0) @@ -1521,27 +1543,32 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          dem = self.master.server.entries(path)          pargfid = self.master.server.gfid(path)          if isinstance(pargfid, int): -            logging.warn('skipping directory %s' % (path)) +            logging.warn(lf('skipping directory', +                            path=path))          for e in dem:              bname = e              e = os.path.join(path, e)              xte = self.xtime(e)              if isinstance(xte, int): -                logging.warn("irregular xtime for %s: %s" % -                             (e, errno.errorcode[xte])) +                logging.warn(lf("irregular xtime", +                                path=e, +                                error=errno.errorcode[xte]))                  continue              if not self.need_sync(e, xte, xtr):                  continue              st = self.master.server.lstat(e)              if isinstance(st, int): -                logging.warn('%s got purged in the interim ...' % e) +                logging.warn(lf('got purged in the interim', +                                path=e))                  continue              if self.is_sticky(e, st.st_mode): -                logging.debug('ignoring sticky bit file %s' % e) +                logging.debug(lf('ignoring sticky bit file', +                                 path=e))                  continue              gfid = self.master.server.gfid(e)              if isinstance(gfid, int): -                logging.warn('skipping entry %s..' % e) +                logging.warn(lf('skipping entry', +                                path=e))                  continue              mo = st.st_mode              self.counter += 1 if ((stat.S_ISDIR(mo) or @@ -1704,14 +1731,12 @@ class Syncer(object):              pb.close()              start = time.time()              po = self.sync_engine(pb, self.log_err) -            logging.info("Sync Time Taken (Job:{0} " -                         "Files:{1} ReturnCode:{2}): " -                         "{3:.4f} secs".format( -                             job_id, -                             len(pb), -                             po.returncode, -                             time.time() - start -                         )) +            logging.info(lf("Sync Time Taken", +                            job=job_id, +                            num_files=len(pb), +                            return_code=po.returncode, +                            duration="%.4f" % (time.time() - start))) +              if po.returncode == 0:                  ret = (True, 0)              elif po.returncode in self.errnos_ok: diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index c54c07d600c..b65f1948050 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -22,7 +22,7 @@ from errno import ECHILD, ESRCH  import re  import random  from gconf import gconf -from syncdutils import select, waitpid, errno_wrap +from syncdutils import select, waitpid, errno_wrap, lf  from syncdutils import set_term_handler, is_host_local, GsyncdError  from syncdutils import escape, Thread, finalize, memoize  from syncdutils import gf_event, EVENT_GEOREP_FAULTY @@ -63,15 +63,17 @@ def get_slave_bricks_status(host, vol):      po.wait()      po.terminate_geterr(fail_on_err=False)      if po.returncode != 0: -        logging.info("Volume status command failed, unable to get " -                     "list of up nodes of %s, returning empty list: %s" % -                     (vol, po.returncode)) +        logging.info(lf("Volume status command failed, unable to get " +                        "list of up nodes, returning empty list", +                        volume=vol, +                        error=po.returncode))          return []      vi = XET.fromstring(vix)      if vi.find('opRet').text != '0': -        logging.info("Unable to get list of up nodes of %s, " -                     "returning empty list: %s" % -                     (vol, vi.find('opErrstr').text)) +        logging.info(lf("Unable to get list of up nodes, " +                        "returning empty list", +                        volume=vol, +                        error=vi.find('opErrstr').text))          return []      up_hosts = set() @@ -81,8 +83,10 @@ def get_slave_bricks_status(host, vol):              if el.find('status').text == '1':                  up_hosts.add(el.find('hostname').text)      except (ParseError, AttributeError, ValueError) as e: -        logging.info("Parsing failed to get list of up nodes of %s, " -                     "returning empty list: %s" % (vol, e)) +        logging.info(lf("Parsing failed to get list of up nodes, " +                        "returning empty list", +                        volume=vol, +                        error=e))      return list(up_hosts) @@ -271,8 +275,9 @@ class Monitor(object):              # Spawn the worker and agent in lock to avoid fd leak              self.lock.acquire() -            logging.info('starting gsyncd worker(%s). Slave node: %s' % -                         (w[0]['dir'], remote_host)) +            logging.info(lf('starting gsyncd worker', +                            brick=w[0]['dir'], +                            slave_node=remote_host))              # Couple of pipe pairs for RPC communication b/w              # worker and changelog agent. @@ -336,15 +341,16 @@ class Monitor(object):                  if ret_agent is not None:                      # Agent is died Kill Worker -                    logging.info("Changelog Agent died, " -                                 "Aborting Worker(%s)" % w[0]['dir']) +                    logging.info(lf("Changelog Agent died, Aborting Worker", +                                    brick=w[0]['dir']))                      errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])                      nwait(cpid)                      nwait(apid)                  if ret is not None: -                    logging.info("worker(%s) died before establishing " -                                 "connection" % w[0]['dir']) +                    logging.info(lf("worker died before establishing " +                                    "connection", +                                    brick=w[0]['dir']))                      nwait(apid)  # wait for agent                  else:                      logging.debug("worker(%s) connected" % w[0]['dir']) @@ -353,15 +359,16 @@ class Monitor(object):                          ret_agent = nwait(apid, os.WNOHANG)                          if ret is not None: -                            logging.info("worker(%s) died in startup " -                                         "phase" % w[0]['dir']) +                            logging.info(lf("worker died in startup phase", +                                            brick=w[0]['dir']))                              nwait(apid)  # wait for agent                              break                          if ret_agent is not None:                              # Agent is died Kill Worker -                            logging.info("Changelog Agent died, Aborting " -                                         "Worker(%s)" % w[0]['dir']) +                            logging.info(lf("Changelog Agent died, Aborting " +                                            "Worker", +                                            brick=w[0]['dir']))                              errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])                              nwait(cpid)                              nwait(apid) @@ -369,13 +376,15 @@ class Monitor(object):                          time.sleep(1)              else: -                logging.info("worker(%s) not confirmed in %d sec, aborting it. " -                             "Gsyncd invocation on remote slave via SSH or " -                             "gluster master mount might have hung. Please " -                             "check the above logs for exact issue and check " -                             "master or slave volume for errors. Restarting " -                             "master/slave volume accordingly might help." -                             % (w[0]['dir'], conn_timeout)) +                logging.info( +                    lf("Worker not confirmed after wait, aborting it. " +                       "Gsyncd invocation on remote slave via SSH or " +                       "gluster master mount might have hung. Please " +                       "check the above logs for exact issue and check " +                       "master or slave volume for errors. Restarting " +                       "master/slave volume accordingly might help.", +                       brick=w[0]['dir'], +                       timeout=conn_timeout))                  errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])                  nwait(apid)  # wait for agent                  ret = nwait(cpid) diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py index d7b17dda796..0ac144930db 100644 --- a/geo-replication/syncdaemon/repce.py +++ b/geo-replication/syncdaemon/repce.py @@ -29,7 +29,7 @@ except ImportError:      # py 3      import pickle -from syncdutils import Thread, select +from syncdutils import Thread, select, lf  pickle_proto = -1  repce_version = 1.0 @@ -203,8 +203,10 @@ class RepceClient(object):              meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})          exc, res = rjob.wait()          if exc: -            logging.error('call %s (%s) failed on peer with %s' % -                          (repr(rjob), meth, str(type(res).__name__))) +            logging.error(lf('call failed on peer', +                             call=repr(rjob), +                             method=meth, +                             error=str(type(res).__name__)))              raise res          logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))          return res diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 5d7234358fb..37f6e1cabc1 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -41,7 +41,7 @@ from syncdutils import get_changelog_log_level, get_rsync_version  from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION  from gsyncdstatus import GeorepStatus  from syncdutils import get_master_and_slave_data_from_args -from syncdutils import mntpt_list +from syncdutils import mntpt_list, lf  UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')  HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) @@ -228,11 +228,9 @@ class Popen(subprocess.Popen):      def errlog(self):          """make a log about child's failure event""" -        filling = "" -        if self.elines: -            filling = ", saying:" -        logging.error("""command "%s" returned with %s%s""" % -                      (" ".join(self.args), repr(self.returncode), filling)) +        logging.error(lf("command returned error", +                         cmd=" ".join(self.args), +                         error=self.returncode))          lp = ''          def logerr(l): @@ -725,11 +723,12 @@ class Server(object):          def rename_with_disk_gfid_confirmation(gfid, entry, en):              if not matching_disk_gfid(gfid, entry): -                logging.error("RENAME ignored: " -                              "source entry:%s(gfid:%s) does not match with " -                              "on-disk gfid(%s), when attempting to rename " -                              "to %s" % -                              (entry, gfid, cls.gfid_mnt(entry), en)) +                logging.error(lf("RENAME ignored: source entry does not match " +                                 "with on-disk gfid", +                                 source=entry, +                                 gfid=gfid, +                                 disk_gfid=cls.gfid_mnt(entry), +                                 target=en))                  return              cmd_ret = errno_wrap(os.rename, @@ -769,12 +768,17 @@ class Server(object):                              logging.debug("Removed %s => %s/%s recursively" %                                            (gfid, pg, bname))                          else: -                            logging.warn("Recursive remove %s => %s/%s" -                                         "failed: %s" % (gfid, pg, bname, -                                                         os.strerror(er1))) +                            logging.warn(lf("Recursive remove failed", +                                            gfid=gfid, +                                            pgfid=pg, +                                            bname=bname, +                                            error=os.strerror(er1)))                      else: -                        logging.warn("Failed to remove %s => %s/%s. %s" % -                                     (gfid, pg, bname, os.strerror(er))) +                        logging.warn(lf("Failed to remove", +                                        gfid=gfid, +                                        pgfid=pg, +                                        bname=bname, +                                        error=os.strerror(er)))              elif op in ['CREATE', 'MKNOD']:                  slink = os.path.join(pfx, gfid)                  st = lstat(slink) @@ -833,10 +837,11 @@ class Server(object):                                      except OSError as e:                                          if e.errno == ENOTEMPTY:                                              logging.error( -                                                "Unable to delete directory " -                                                "{0}, Both Old({1}) and New{2}" -                                                " directories exists".format( -                                                    entry, entry, en)) +                                                lf("Unable to delete directory" +                                                   ", Both Old and New" +                                                   " directories exists", +                                                   old=entry, +                                                   new=en))                                          else:                                              raise                                  else: @@ -1011,8 +1016,8 @@ class SlaveLocal(object):                  time.sleep(int(gconf.timeout))                  if lp == self.server.last_keep_alive:                      logging.info( -                        "connection inactive for %d seconds, stopping" % -                        int(gconf.timeout)) +                        lf("connection inactive, stopping", +                           timeout=int(gconf.timeout)))                      break          else:              select((), (), ()) @@ -1114,7 +1119,9 @@ class SlaveRemote(object):          if kw.get("log_err", False):              for errline in stderr.strip().split("\n")[:-1]: -                logging.error("SYNC Error(Rsync): %s" % errline) +                logging.error(lf("SYNC Error", +                                 sync_engine="Rsync", +                                 error=errline))          if log_rsync_performance:              rsync_msg = [] @@ -1129,7 +1136,8 @@ class SlaveRemote(object):                     line.startswith("Total bytes received:") or \                     line.startswith("sent "):                      rsync_msg.append(line) -            logging.info("rsync performance: %s" % ", ".join(rsync_msg)) +            logging.info(lf("rsync performance", +                            data=", ".join(rsync_msg)))          return po @@ -1169,7 +1177,9 @@ class SlaveRemote(object):          if log_err:              for errline in stderr1.strip().split("\n")[:-1]: -                logging.error("SYNC Error(Untar): %s" % errline) +                logging.error(lf("SYNC Error", +                                 sync_engine="Tarssh", +                                 error=errline))          return p1 @@ -1389,7 +1399,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  if rv:                      rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \                           (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) -                    logging.warn('stale mount possibly left behind on ' + d) +                    logging.warn(lf('stale mount possibly left behind', +                                    path=d))                      raise GsyncdError("cleaning up temp mountpoint %s "                                        "failed with status %d" %                                        (d, rv)) @@ -1478,7 +1489,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  # if cli terminated with error due to being                  # refused by glusterd, what it put                  # out on stdout is a diagnostic message -                logging.error('glusterd answered: %s' % self.mntpt) +                logging.error(lf('glusterd answered', mnt=self.mntpt))      def connect(self):          """inhibit the resource beyond @@ -1488,7 +1499,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          with given backend          """ -        logging.info ("Mounting gluster volume locally...") +        logging.info("Mounting gluster volume locally...")          t0 = time.time()          label = getattr(gconf, 'mountbroker', None)          if not label and not privileged(): @@ -1500,8 +1511,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              ['log-file=' + gconf.gluster_log_file, 'volfile-server=' +               self.host, 'volfile-id=' + self.volume, 'client-pid=-1']          mounter(params).inhibit(*[l for l in [label] if l]) -        logging.info ("Mounted gluster volume. Time taken: {0:.4f} " -                      "secs".format((time.time() - t0))) +        logging.info(lf("Mounted gluster volume", +                        duration="%.4f" % (time.time() - t0)))      def connect_remote(self, *a, **kw):          sup(self, *a, **kw) @@ -1643,11 +1654,12 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  g2.register(register_time, changelog_agent, status)                  g3.register(register_time, changelog_agent, status)              except ChangelogException as e: -                logging.error("Changelog register failed, %s" % e) +                logging.error(lf("Changelog register failed", error=e))                  sys.exit(1)              g1.register(status=status) -            logging.info("Register time: %s" % register_time) +            logging.info(lf("Register time", +                            time=register_time))              # oneshot: Try to use changelog history api, if not              # available switch to FS crawl              # Note: if config.change_detector is xsync then @@ -1655,8 +1667,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              try:                  g3.crawlwrap(oneshot=True)              except PartialHistoryAvailable as e: -                logging.info('Partial history available, using xsync crawl' -                             ' after consuming history till %s' % str(e)) +                logging.info(lf('Partial history available, using xsync crawl' +                                ' after consuming history', +                                till=e))                  g1.crawlwrap(oneshot=True, register_time=register_time)              except ChangelogHistoryNotAvailable:                  logging.info('Changelog history not available, using xsync') @@ -1665,13 +1678,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  logging.info('No stime available, using xsync crawl')                  g1.crawlwrap(oneshot=True, register_time=register_time)              except ChangelogException as e: -                logging.error("Changelog History Crawl failed, %s" % e) +                logging.error(lf("Changelog History Crawl failed", +                                 error=e))                  sys.exit(1)              try:                  g2.crawlwrap()              except ChangelogException as e: -                logging.error("Changelog crawl failed, %s" % e) +                logging.error(lf("Changelog crawl failed", error=e))                  sys.exit(1)          else:              sup(self, *args) @@ -1763,14 +1777,14 @@ class SSH(AbstractUrl, SlaveRemote):                                   self.inner_rsc.url)          deferred = go_daemon == 'postconn' -        logging.info ("Initializing SSH connection between master and slave...") +        logging.info("Initializing SSH connection between master and slave...")          t0 = time.time()          ret = sup(self, gconf.ssh_command.split() +                    ["-p", str(gconf.ssh_port)] +                    gconf.ssh_ctl_args + [self.remote_addr],                    slave=self.inner_rsc.url, deferred=deferred) -        logging.info ("SSH connection between master and slave established. " -                      "Time taken: {0:.4f} secs".format((time.time() - t0))) +        logging.info(lf("SSH connection between master and slave established.", +                        duration="%.4f" % (time.time() - t0)))          if deferred:              # send a message to peer so that we can wait for diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 321e0d32ccc..b5f09459c57 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -304,8 +304,8 @@ def log_raise_exception(excont):                  gconf.transport.terminate_geterr()          elif isinstance(exc, OSError) and exc.errno in (ENOTCONN,                                                          ECONNABORTED): -            logging.error('glusterfs session went down [%s]', -                          errorcode[exc.errno]) +            logging.error(lf('glusterfs session went down', +                             error=errorcode[exc.errno]))          else:              logtag = "FAIL"          if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): @@ -387,8 +387,9 @@ def boolify(s):      if lstr in true_list:          rv = True      elif not lstr in false_list: -        logging.warn("Unknown string (%s) in string to boolean conversion " -                     "defaulting to False\n" % (s)) +        logging.warn(lf("Unknown string in \"string to boolean\" conversion, " +                        "defaulting to False", +                        str=s))      return rv @@ -497,8 +498,9 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):              nr_tries += 1              if nr_tries == GF_OP_RETRIES:                  # probably a screwed state, cannot do much... -                logging.warn('reached maximum retries (%s)...%s' % -                             (repr(arg), ex)) +                logging.warn(lf('reached maximum retries', +                                args=repr(arg), +                                error=ex))                  raise              time.sleep(0.250)  # retry the call @@ -572,3 +574,16 @@ def get_rsync_version(rsync_cmd):          rsync_version = out.split(" ", 4)[3]      return rsync_version + + +def lf(event, **kwargs): +    """ +    Log Format helper function, log messages can be +    easily modified to structured log format. +    lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be +    converted as "Config Change<TAB>brick=/bricks/b1<TAB>sync_jobs=4" +    """ +    msg = event +    for k, v in kwargs.items(): +        msg += "\t{0}={1}".format(k, v) +    return msg  | 
