diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 92 | 
1 files changed, 53 insertions, 39 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 5d7234358fb..37f6e1cabc1 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -41,7 +41,7 @@ from syncdutils import get_changelog_log_level, get_rsync_version  from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION  from gsyncdstatus import GeorepStatus  from syncdutils import get_master_and_slave_data_from_args -from syncdutils import mntpt_list +from syncdutils import mntpt_list, lf  UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')  HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) @@ -228,11 +228,9 @@ class Popen(subprocess.Popen):      def errlog(self):          """make a log about child's failure event""" -        filling = "" -        if self.elines: -            filling = ", saying:" -        logging.error("""command "%s" returned with %s%s""" % -                      (" ".join(self.args), repr(self.returncode), filling)) +        logging.error(lf("command returned error", +                         cmd=" ".join(self.args), +                         error=self.returncode))          lp = ''          def logerr(l): @@ -725,11 +723,12 @@ class Server(object):          def rename_with_disk_gfid_confirmation(gfid, entry, en):              if not matching_disk_gfid(gfid, entry): -                logging.error("RENAME ignored: " -                              "source entry:%s(gfid:%s) does not match with " -                              "on-disk gfid(%s), when attempting to rename " -                              "to %s" % -                              (entry, gfid, cls.gfid_mnt(entry), en)) +                logging.error(lf("RENAME ignored: source entry does not match " +                                 "with on-disk gfid", +                                 source=entry, +                                 gfid=gfid, +                                 disk_gfid=cls.gfid_mnt(entry), +                                 target=en))                  return              cmd_ret = errno_wrap(os.rename, @@ -769,12 +768,17 @@ class Server(object):                              logging.debug("Removed %s => %s/%s recursively" %                                            (gfid, pg, bname))                          else: -                            logging.warn("Recursive remove %s => %s/%s" -                                         "failed: %s" % (gfid, pg, bname, -                                                         os.strerror(er1))) +                            logging.warn(lf("Recursive remove failed", +                                            gfid=gfid, +                                            pgfid=pg, +                                            bname=bname, +                                            error=os.strerror(er1)))                      else: -                        logging.warn("Failed to remove %s => %s/%s. %s" % -                                     (gfid, pg, bname, os.strerror(er))) +                        logging.warn(lf("Failed to remove", +                                        gfid=gfid, +                                        pgfid=pg, +                                        bname=bname, +                                        error=os.strerror(er)))              elif op in ['CREATE', 'MKNOD']:                  slink = os.path.join(pfx, gfid)                  st = lstat(slink) @@ -833,10 +837,11 @@ class Server(object):                                      except OSError as e:                                          if e.errno == ENOTEMPTY:                                              logging.error( -                                                "Unable to delete directory " -                                                "{0}, Both Old({1}) and New{2}" -                                                " directories exists".format( -                                                    entry, entry, en)) +                                                lf("Unable to delete directory" +                                                   ", Both Old and New" +                                                   " directories exists", +                                                   old=entry, +                                                   new=en))                                          else:                                              raise                                  else: @@ -1011,8 +1016,8 @@ class SlaveLocal(object):                  time.sleep(int(gconf.timeout))                  if lp == self.server.last_keep_alive:                      logging.info( -                        "connection inactive for %d seconds, stopping" % -                        int(gconf.timeout)) +                        lf("connection inactive, stopping", +                           timeout=int(gconf.timeout)))                      break          else:              select((), (), ()) @@ -1114,7 +1119,9 @@ class SlaveRemote(object):          if kw.get("log_err", False):              for errline in stderr.strip().split("\n")[:-1]: -                logging.error("SYNC Error(Rsync): %s" % errline) +                logging.error(lf("SYNC Error", +                                 sync_engine="Rsync", +                                 error=errline))          if log_rsync_performance:              rsync_msg = [] @@ -1129,7 +1136,8 @@ class SlaveRemote(object):                     line.startswith("Total bytes received:") or \                     line.startswith("sent "):                      rsync_msg.append(line) -            logging.info("rsync performance: %s" % ", ".join(rsync_msg)) +            logging.info(lf("rsync performance", +                            data=", ".join(rsync_msg)))          return po @@ -1169,7 +1177,9 @@ class SlaveRemote(object):          if log_err:              for errline in stderr1.strip().split("\n")[:-1]: -                logging.error("SYNC Error(Untar): %s" % errline) +                logging.error(lf("SYNC Error", +                                 sync_engine="Tarssh", +                                 error=errline))          return p1 @@ -1389,7 +1399,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  if rv:                      rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \                           (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) -                    logging.warn('stale mount possibly left behind on ' + d) +                    logging.warn(lf('stale mount possibly left behind', +                                    path=d))                      raise GsyncdError("cleaning up temp mountpoint %s "                                        "failed with status %d" %                                        (d, rv)) @@ -1478,7 +1489,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  # if cli terminated with error due to being                  # refused by glusterd, what it put                  # out on stdout is a diagnostic message -                logging.error('glusterd answered: %s' % self.mntpt) +                logging.error(lf('glusterd answered', mnt=self.mntpt))      def connect(self):          """inhibit the resource beyond @@ -1488,7 +1499,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          with given backend          """ -        logging.info ("Mounting gluster volume locally...") +        logging.info("Mounting gluster volume locally...")          t0 = time.time()          label = getattr(gconf, 'mountbroker', None)          if not label and not privileged(): @@ -1500,8 +1511,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              ['log-file=' + gconf.gluster_log_file, 'volfile-server=' +               self.host, 'volfile-id=' + self.volume, 'client-pid=-1']          mounter(params).inhibit(*[l for l in [label] if l]) -        logging.info ("Mounted gluster volume. Time taken: {0:.4f} " -                      "secs".format((time.time() - t0))) +        logging.info(lf("Mounted gluster volume", +                        duration="%.4f" % (time.time() - t0)))      def connect_remote(self, *a, **kw):          sup(self, *a, **kw) @@ -1643,11 +1654,12 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  g2.register(register_time, changelog_agent, status)                  g3.register(register_time, changelog_agent, status)              except ChangelogException as e: -                logging.error("Changelog register failed, %s" % e) +                logging.error(lf("Changelog register failed", error=e))                  sys.exit(1)              g1.register(status=status) -            logging.info("Register time: %s" % register_time) +            logging.info(lf("Register time", +                            time=register_time))              # oneshot: Try to use changelog history api, if not              # available switch to FS crawl              # Note: if config.change_detector is xsync then @@ -1655,8 +1667,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              try:                  g3.crawlwrap(oneshot=True)              except PartialHistoryAvailable as e: -                logging.info('Partial history available, using xsync crawl' -                             ' after consuming history till %s' % str(e)) +                logging.info(lf('Partial history available, using xsync crawl' +                                ' after consuming history', +                                till=e))                  g1.crawlwrap(oneshot=True, register_time=register_time)              except ChangelogHistoryNotAvailable:                  logging.info('Changelog history not available, using xsync') @@ -1665,13 +1678,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  logging.info('No stime available, using xsync crawl')                  g1.crawlwrap(oneshot=True, register_time=register_time)              except ChangelogException as e: -                logging.error("Changelog History Crawl failed, %s" % e) +                logging.error(lf("Changelog History Crawl failed", +                                 error=e))                  sys.exit(1)              try:                  g2.crawlwrap()              except ChangelogException as e: -                logging.error("Changelog crawl failed, %s" % e) +                logging.error(lf("Changelog crawl failed", error=e))                  sys.exit(1)          else:              sup(self, *args) @@ -1763,14 +1777,14 @@ class SSH(AbstractUrl, SlaveRemote):                                   self.inner_rsc.url)          deferred = go_daemon == 'postconn' -        logging.info ("Initializing SSH connection between master and slave...") +        logging.info("Initializing SSH connection between master and slave...")          t0 = time.time()          ret = sup(self, gconf.ssh_command.split() +                    ["-p", str(gconf.ssh_port)] +                    gconf.ssh_ctl_args + [self.remote_addr],                    slave=self.inner_rsc.url, deferred=deferred) -        logging.info ("SSH connection between master and slave established. " -                      "Time taken: {0:.4f} secs".format((time.time() - t0))) +        logging.info(lf("SSH connection between master and slave established.", +                        duration="%.4f" % (time.time() - t0)))          if deferred:              # send a message to peer so that we can wait for  | 
