diff options
| author | Aravinda VK <avishwan@redhat.com> | 2017-11-30 12:52:30 +0530 | 
|---|---|---|
| committer | Kotresh HR <khiremat@redhat.com> | 2018-01-23 03:03:01 +0000 | 
| commit | 7c9b62cfff34d1ac4c8fa0822b18e51c15e6db81 (patch) | |
| tree | be9bab79cdb0762e644063f24f524cd2b5487308 | |
| parent | 8efa3ee675a991410c6aa27dce40d4dd441d0935 (diff) | |
geo-rep: Support for using Volinfo from Conf file
Once Geo-replication is started, it runs Gluster commands to get Volume
info from Master and Slave. With this patch, Georep can get Volume info
from Conf file if `--use-gconf-volinfo` argument is specified to monitor
Create a config(Or add to the config if exists) with following fields
    [vars]
    master-bricks=NODEID:HOSTNAME:PATH,..
    slave-bricks=NODEID:HOSTNAME,..
    master-volume-id=
    slave-volume-id=
    master-replica-count=
    master-disperse_count=
Note: Exising Geo-replication is not affected since this is activated
only when `--use-gconf-volinfo` is passed while spawning `gsyncd
monitor`
Tiering support is not yet added since Tiering + Glusterd2 is still
under discussion.
Fixes: #396
Change-Id: I281baccbad03686c00f6488a8511dd6db0edc57a
Signed-off-by: Aravinda VK <avishwan@redhat.com>
| -rw-r--r-- | geo-replication/gsyncd.conf.in | 21 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 1 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 68 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 88 | 
4 files changed, 128 insertions, 50 deletions
diff --git a/geo-replication/gsyncd.conf.in b/geo-replication/gsyncd.conf.in index 53cc76b842a..80a9e4a8e8b 100644 --- a/geo-replication/gsyncd.conf.in +++ b/geo-replication/gsyncd.conf.in @@ -1,6 +1,26 @@  [__meta__]  version = 4.0 +[master-bricks] +configurable=false + +[slave-bricks] +configurable=false + +[master-volume-id] +configurable=false + +[slave-volume-id] +configurable=false + +[master-replica-count] +configurable=false +type=int + +[master-disperse_count] +configurable=false +type=int +  [glusterd-workdir]  value = @GLUSTERD_WORKDIR@ @@ -234,6 +254,7 @@ allowed_values=ERROR,INFO,WARNING,DEBUG  value=22  validation=int  help=Set SSH port +type=int  [ssh-command]  value=ssh diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 04ceb435bf7..3458898646e 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -56,6 +56,7 @@ def main():                     help="Start with Paused state")      p.add_argument("--local-node-id", help="Local Node ID")      p.add_argument("--debug", action="store_true") +    p.add_argument("--use-gconf-volinfo", action="store_true")      # Worker      p = sp.add_parser("worker") diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index a193b57caff..257d34a743b 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -14,7 +14,6 @@ import time  import signal  import logging  import xml.etree.ElementTree as XET -from subprocess import PIPE  from threading import Lock  from errno import ECHILD, ESRCH  import random @@ -23,9 +22,9 @@ from resource import SSH  import gsyncdconfig as gconf  from rconf import rconf  from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile -from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import Thread, finalize, Popen, Volinfo -from syncdutils import gf_event, EVENT_GEOREP_FAULTY +from syncdutils import set_term_handler, GsyncdError +from syncdutils import Thread, finalize, Volinfo, VolinfoFromGconf +from syncdutils import gf_event, EVENT_GEOREP_FAULTY, get_up_nodes  from gsyncdstatus import GeorepStatus, set_monitor_status @@ -54,43 +53,6 @@ def get_subvol_num(brick_idx, vol, hot):          return str(cnt) -def get_slave_bricks_status(host, vol): -    po = Popen(['gluster', '--xml', '--remote-host=' + host, -                'volume', 'status', vol, "detail"], -               stdout=PIPE, stderr=PIPE) -    vix = po.stdout.read() -    po.wait() -    po.terminate_geterr(fail_on_err=False) -    if po.returncode != 0: -        logging.info(lf("Volume status command failed, unable to get " -                        "list of up nodes, returning empty list", -                        volume=vol, -                        error=po.returncode)) -        return [] -    vi = XET.fromstring(vix) -    if vi.find('opRet').text != '0': -        logging.info(lf("Unable to get list of up nodes, " -                        "returning empty list", -                        volume=vol, -                        error=vi.find('opErrstr').text)) -        return [] - -    up_hosts = set() - -    try: -        for el in vi.findall('volStatus/volumes/volume/node'): -            if el.find('status').text == '1': -                up_hosts.add((el.find('hostname').text, -                              el.find('peerid').text)) -    except (ParseError, AttributeError, ValueError) as e: -        logging.info(lf("Parsing failed to get list of up nodes, " -                        "returning empty list", -                        volume=vol, -                        error=e)) - -    return list(up_hosts) - -  class Monitor(object):      """class which spawns and manages gsyncd workers""" @@ -116,7 +78,7 @@ class Monitor(object):          errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])      def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, -                suuid): +                suuid, slavenodes):          """the monitor loop          Basic logic is a blantantly simple blunt heuristics: @@ -180,8 +142,7 @@ class Monitor(object):              # If the connected slave node is down then try to connect to              # different up node.              current_slave_host = remote_host -            slave_up_hosts = get_slave_bricks_status( -                slave_host, slave_vol) +            slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port"))              if (current_slave_host, remote_id) not in slave_up_hosts:                  if len(slave_up_hosts) > 0: @@ -354,7 +315,7 @@ class Monitor(object):          self.status[w[0]['dir']].set_worker_status(self.ST_INCON)          return ret -    def multiplex(self, wspx, suuid, slave_vol, slave_host, master): +    def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes):          argv = [os.path.basename(sys.executable), sys.argv[0]]          cpids = set() @@ -363,7 +324,7 @@ class Monitor(object):          for wx in wspx:              def wmon(w):                  cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, -                                       slave_host, master, suuid) +                                       slave_host, master, suuid, slavenodes)                  time.sleep(1)                  self.lock.acquire()                  for cpid in cpids: @@ -380,7 +341,10 @@ class Monitor(object):  def distribute(master, slave): -    mvol = Volinfo(master.volume, master.host) +    if rconf.args.use_gconf_volinfo: +        mvol = VolinfoFromGconf(master.volume, master=True) +    else: +        mvol = Volinfo(master.volume, master.host)      logging.debug('master bricks: ' + repr(mvol.bricks))      prelude = []      slave_host = None @@ -393,7 +357,11 @@ def distribute(master, slave):      logging.debug('slave SSH gateway: ' + slave.remote_addr) -    svol = Volinfo(slave.volume, "localhost", prelude) +    if rconf.args.use_gconf_volinfo: +        svol = VolinfoFromGconf(slave.volume, master=False) +    else: +        svol = Volinfo(slave.volume, "localhost", prelude) +      sbricks = svol.bricks      suuid = svol.uuid      slave_host = slave.remote_addr.split('@')[-1] @@ -415,14 +383,14 @@ def distribute(master, slave):      workerspex = []      for idx, brick in enumerate(mvol.bricks): -        if is_host_local(brick['uuid']): +        if rconf.args.local_node_id == brick['uuid']:              is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']]))              workerspex.append((brick,                                 slaves[idx % len(slaves)],                                 get_subvol_num(idx, mvol, is_hot),                                 is_hot))      logging.debug('worker specs: ' + repr(workerspex)) -    return workerspex, suuid, slave_vol, slave_host, master +    return workerspex, suuid, slave_vol, slave_host, master, slavenodes  def monitor(local, remote): diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 1f2692254db..e546f558265 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -18,6 +18,7 @@ import logging  import errno  import threading  import subprocess +import socket  from subprocess import PIPE  from threading import Lock, Thread as baseThread  from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED @@ -871,3 +872,90 @@ class Volinfo(object):              return int(self.get('hotBricks/hotbrickCount')[0].text)          else:              return 0 + + +class VolinfoFromGconf(object): +    # Glusterd will generate following config items before Geo-rep start +    # So that Geo-rep need not run gluster commands from inside +    # Volinfo object API/interface kept as is so that caller need not +    # change anything exept calling this instead of Volinfo() +    # +    # master-bricks= +    # master-bricks=NODEID:HOSTNAME:PATH,.. +    # slave-bricks=NODEID:HOSTNAME,.. +    # master-volume-id= +    # slave-volume-id= +    # master-replica-count= +    # master-disperse_count= +    def __init__(self, vol, host='localhost', master=True): +        self.volume = vol +        self.host = host +        self.master = master + +    def is_tier(self): +        return False + +    def is_hot(self, brickpath): +        return False + +    @property +    @memoize +    def bricks(self): +        pfx = "master-" if self.master else "slave-" +        bricks_data = gconf.get(pfx + "bricks") +        if bricks_data is None: +            return [] + +        bricks_data = bricks_data.split(",") +        bricks_data = [b.strip() for b in bricks_data] +        out = [] +        for b in bricks_data: +            parts = b.split(":") +            bpath = parts[2] if len(parts) == 3 else "" +            out.append({"host": parts[1], "dir": bpath, "uuid": parts[0]}) + +        return out + +    @property +    @memoize +    def uuid(self): +        if self.master: +            return gconf.get("master-volume-id") +        else: +            return gconf.get("slave-volume-id") + +    def replica_count(self, tier, hot): +        return gconf.get("master-replica-count") + +    def disperse_count(self, tier, hot): +        return gconf.get("master-disperse-count") + +    @property +    @memoize +    def hot_bricks(self): +        return [] + +    def get_hot_bricks_count(self, tier): +        return 0 + + +def can_ssh(host, port=22): +    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +    try: +        s.connect((host, port)) +        flag = True +    except socket.error: +        flag = False + +    s.close() +    return flag + + +def get_up_nodes(hosts, port): +    # List of hosts with Hostname/IP and UUID +    up_nodes = [] +    for h in hosts: +        if can_ssh(h[0], port): +            up_nodes.append(h) + +    return up_nodes  | 
