summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--geo-replication/syncdaemon/gsyncd.py4
-rw-r--r--geo-replication/syncdaemon/master.py39
-rw-r--r--geo-replication/syncdaemon/monitor.py45
-rw-r--r--geo-replication/syncdaemon/syncdutils.py4
4 files changed, 88 insertions, 4 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index b38f19d33f7..1542810bcd7 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -255,6 +255,8 @@ def main_i():
type=int, default=1)
op.add_option('--changelog-archive-format', metavar='N',
type=str, default="%Y%m")
+ op.add_option('--meta-volume', metavar='N',
+ type=str, default="")
op.add_option(
'--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
op.add_option('--allow-network', metavar='IPS', default='')
@@ -297,6 +299,8 @@ def main_i():
op.add_option('--feedback-fd', dest='feedback_fd', type=int,
help=SUPPRESS_HELP, action='callback', callback=store_local)
op.add_option('--rpc-fd', dest='rpc_fd', type=str, help=SUPPRESS_HELP)
+ op.add_option('--subvol-num', dest='subvol_num', type=int,
+ help=SUPPRESS_HELP)
op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,
action='callback', callback=store_local_curry(True))
op.add_option('-N', '--no-daemon', dest="go_daemon",
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 51c26c76116..dfe65fe6709 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -14,11 +14,12 @@ import time
import stat
import json
import logging
+import fcntl
import socket
import string
import errno
import tarfile
-from errno import ENOENT, ENODATA, EPIPE, EEXIST
+from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN
from threading import Condition, Lock
from datetime import datetime
from gconf import gconf
@@ -452,8 +453,40 @@ class GMasterCommon(object):
t = Thread(target=keep_alive)
t.start()
- def should_crawl(cls):
- return gconf.glusterd_uuid in cls.master.server.node_uuid()
+ def mgmt_lock(self):
+ """Take management volume lock """
+ bname = str(gconf.volume_id) + "_subvol_" + str(gconf.subvol_num) \
+ + ".lock"
+ path = os.path.join(gconf.working_dir, gconf.meta_volume, bname)
+ logging.debug("lock_file_path: %s" % path)
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
+ try:
+ fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except:
+ ex = sys.exc_info()[1]
+ os.close(fd)
+ if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
+ # cannot grab, it's taken
+ logging.debug("Lock held by someother worker process")
+ return False
+ raise
+ logging.debug("Got the lock")
+ return True
+
+
+ def should_crawl(self):
+ if not gconf.meta_volume:
+ return gconf.glusterd_uuid in self.master.server.node_uuid()
+
+ mgmt_mnt = os.path.join(gconf.working_dir, gconf.meta_volume)
+ if not os.path.ismount(mgmt_mnt):
+ po = Popen(["mount", "-t", "glusterfs", "localhost:%s"
+ % gconf.meta_volume, mgmt_mnt], stdout=PIPE,
+ stderr=PIPE)
+ po.wait()
+ po.terminate_geterr()
+ return self.mgmt_lock()
+
def register(self):
self.register()
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 5e0698b8c46..e50893c793f 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -18,6 +18,7 @@ import xml.etree.ElementTree as XET
from subprocess import PIPE
from resource import Popen, FILE, GLUSTER, SSH
from threading import Lock
+from errno import EEXIST
import re
import random
from gconf import gconf
@@ -29,6 +30,16 @@ from syncdutils import escape, Thread, finalize, memoize
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
+def get_subvol_num(brick_idx, replica_count, disperse_count):
+ subvol_size = disperse_count if disperse_count > 0 else replica_count
+ cnt = int((brick_idx + 1) / subvol_size)
+ rem = (brick_idx + 1) % subvol_size
+ if rem > 0:
+ return cnt + 1
+ else:
+ return cnt
+
+
def get_slave_bricks_status(host, vol):
po = Popen(['gluster', '--xml', '--remote-host=' + host,
'volume', 'status', vol, "detail"],
@@ -104,6 +115,15 @@ class Volinfo(object):
self.volume, self.host)
return ids[0].text
+ @property
+ @memoize
+ def replica_count(self):
+ return int(self.get('replicaCount')[0].text)
+
+ @property
+ @memoize
+ def disperse_count(self):
+ return int(self.get('disperseCount')[0].text)
class Monitor(object):
@@ -154,6 +174,7 @@ class Monitor(object):
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
+
def monitor(self, w, argv, cpids, agents, slave_vol, slave_host):
"""the monitor loop
@@ -247,6 +268,7 @@ class Monitor(object):
'--rpc-fd',
','.join([str(rw), str(ww),
str(ra), str(wa)]),
+ '--subvol-num', str(w[2]),
'--resource-remote',
remote_host])
@@ -374,7 +396,8 @@ def distribute(*resources):
else:
slaves = slavevols
- workerspex = [(brick['dir'], slaves[idx % len(slaves)])
+ workerspex = [(brick['dir'], slaves[idx % len(slaves)],
+ get_subvol_num(idx, mvol.replica_count, mvol.disperse_count))
for idx, brick in enumerate(mvol.bricks)
if is_host_local(brick['host'])]
logging.info('worker specs: ' + repr(workerspex))
@@ -382,6 +405,26 @@ def distribute(*resources):
def monitor(*resources):
+ # Mount geo-rep management volume
+ if gconf.meta_volume:
+ mgmt_mnt = os.path.join(gconf.working_dir, gconf.meta_volume)
+ if not os.path.exists(mgmt_mnt):
+ try:
+ os.makedirs(mgmt_mnt)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST:
+ pass
+ else:
+ raise
+
+ if not os.path.ismount(mgmt_mnt):
+ po = Popen(["mount", "-t", "glusterfs", "localhost:%s"
+ % gconf.meta_volume, mgmt_mnt], stdout=PIPE,
+ stderr=PIPE)
+ po.wait()
+ po.terminate_geterr()
+
# Check if gsyncd restarted in pause state. If
# yes, send SIGSTOP to negative of monitor pid
# to go back to pause state.
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 6af957ddb4a..5037004d9a5 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -16,6 +16,7 @@ import fcntl
import shutil
import logging
import socket
+from subprocess import Popen, PIPE
from threading import Lock, Thread as baseThread
from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
from errno import EINTR, ENOENT, EPERM, ESTALE, errorcode
@@ -215,6 +216,9 @@ def finalize(*a, **kw):
except:
if sys.exc_info()[0] == OSError:
pass
+
+ # TODO: Clean up mgmt volume mount point only monitor dies
+
if gconf.log_exit:
logging.info("exiting.")
sys.stdout.flush()