From 8f5f4537af7790be386974628c804a7bc719b738 Mon Sep 17 00:00:00 2001 From: "Bala.FA" Date: Fri, 7 Mar 2014 18:28:09 +0530 Subject: Initial commit Change-Id: Ie8fdd046d111a4a46abe0e162985e833323bfd7d Signed-off-by: Bala.FA --- glusternagios/Makefile.am | 4 + glusternagios/__init__.py | 19 ++ glusternagios/utils.py | 438 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 461 insertions(+) create mode 100644 glusternagios/Makefile.am create mode 100644 glusternagios/__init__.py create mode 100644 glusternagios/utils.py (limited to 'glusternagios') diff --git a/glusternagios/Makefile.am b/glusternagios/Makefile.am new file mode 100644 index 0000000..55f8642 --- /dev/null +++ b/glusternagios/Makefile.am @@ -0,0 +1,4 @@ +dist_glusternagioscommonpylib_PYTHON = \ + __init__.py \ + utils.py \ + $(NULL) diff --git a/glusternagios/__init__.py b/glusternagios/__init__.py new file mode 100644 index 0000000..ff21495 --- /dev/null +++ b/glusternagios/__init__.py @@ -0,0 +1,19 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# diff --git a/glusternagios/utils.py b/glusternagios/utils.py new file mode 100644 index 0000000..b4d77c6 --- /dev/null +++ b/glusternagios/utils.py @@ -0,0 +1,438 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +## most of the code is copied from vdsm project + +import logging +import subprocess +from cpopen import CPopen +import io +import select +import threading +from StringIO import StringIO +from weakref import proxy +import time +import os +import errno +import signal + + +class CommandPath(object): + def __init__(self, name, *args): + self.name = name + self.paths = args + self._cmd = None + + @property + def cmd(self): + if not self._cmd: + for path in self.paths: + if os.path.exists(path): + self._cmd = path + break + else: + raise OSError(os.errno.ENOENT, + os.strerror(os.errno.ENOENT) + ': ' + self.name) + return self._cmd + + def __repr__(self): + return str(self.cmd) + + def __str__(self): + return str(self.cmd) + + def __unicode__(self): + return unicode(self.cmd) + + +ioniceCmdPath = CommandPath("ionice", + "/bin/ionice", + ) +killCmdPath = CommandPath("kill", + "/bin/kill", + ) +niceCmdPath = CommandPath("nice", + "/bin/nice", + ) +setsidCmdPath = CommandPath("setsid", + "/bin/setsid", + ) +sudoCmdPath = CommandPath("sudo", + "/bin/sudo", + ) +# Buffsize is 1K because I tested it on some use cases and 1K was fastest. If +# you find this number to be a bottleneck in any way you are welcome to change +# it +BUFFSIZE = 1024 +SUDO_NON_INTERACTIVE_FLAG = "-n" + + +# NOTE: it would be best to try and unify NoIntrCall and NoIntrPoll. +# We could do so defining a new object that can be used as a placeholer +# for the changing timeout value in the *args/**kwargs. This would +# lead us to rebuilding the function arguments at each loop. +def NoIntrPoll(pollfun, timeout=-1): + """ + This wrapper is used to handle the interrupt exceptions that might + occur during a poll system call. The wrapped function must be defined + as poll([timeout]) where the special timeout value 0 is used to return + immediately and -1 is used to wait indefinitely. + """ + # When the timeout < 0 we shouldn't compute a new timeout after an + # interruption. + endtime = None if timeout < 0 else time.time() + timeout + + while True: + try: + return pollfun(timeout) + except (IOError, select.error) as e: + if e.args[0] != errno.EINTR: + raise + + if endtime is not None: + timeout = max(0, endtime - time.time()) + + +class AsyncProc(object): + """ + AsyncProc is a funky class. It wraps a standard subprocess.Popen + Object and gives it super powers. Like the power to read from a stream + without the fear of deadlock. It does this by always sampling all + stream while waiting for data. By doing this the other process can freely + write data to all stream without the fear of it getting stuck writing + to a full pipe. + """ + class _streamWrapper(io.RawIOBase): + def __init__(self, parent, streamToWrap, fd): + io.IOBase.__init__(self) + self._stream = streamToWrap + self._parent = proxy(parent) + self._fd = fd + self._closed = False + + def close(self): + if not self._closed: + self._closed = True + while not self._streamClosed: + self._parent._processStreams() + + @property + def closed(self): + return self._closed + + @property + def _streamClosed(self): + return (self.fileno() in self._parent._closedfds) + + def fileno(self): + return self._fd + + def seekable(self): + return False + + def readable(self): + return True + + def writable(self): + return True + + def _readNonBlock(self, length): + hasNewData = (self._stream.len - self._stream.pos) + if hasNewData < length and not self._streamClosed: + self._parent._processStreams() + + with self._parent._streamLock: + res = self._stream.read(length) + if self._stream.pos == self._stream.len: + self._stream.truncate(0) + + if res == "" and not self._streamClosed: + return None + else: + return res + + def read(self, length): + if not self._parent.blocking: + return self._readNonBlock(length) + else: + res = None + while res is None: + res = self._readNonBlock(length) + + return res + + def readinto(self, b): + data = self.read(len(b)) + if data is None: + return None + + bytesRead = len(data) + b[:bytesRead] = data + + return bytesRead + + def write(self, data): + if hasattr(data, "tobytes"): + data = data.tobytes() + with self._parent._streamLock: + oldPos = self._stream.pos + self._stream.pos = self._stream.len + self._stream.write(data) + self._stream.pos = oldPos + + while self._stream.len > 0 and not self._streamClosed: + self._parent._processStreams() + + if self._streamClosed: + self._closed = True + + if self._stream.len != 0: + raise IOError(errno.EPIPE, + "Could not write all data to stream") + + return len(data) + + def __init__(self, popenToWrap): + self._streamLock = threading.Lock() + self._proc = popenToWrap + + self._stdout = StringIO() + self._stderr = StringIO() + self._stdin = StringIO() + + fdout = self._proc.stdout.fileno() + fderr = self._proc.stderr.fileno() + self._fdin = self._proc.stdin.fileno() + + self._closedfds = [] + + self._poller = select.epoll() + self._poller.register(fdout, select.EPOLLIN | select.EPOLLPRI) + self._poller.register(fderr, select.EPOLLIN | select.EPOLLPRI) + self._poller.register(self._fdin, 0) + self._fdMap = {fdout: self._stdout, + fderr: self._stderr, + self._fdin: self._stdin} + + self.stdout = io.BufferedReader(self._streamWrapper(self, + self._stdout, fdout), BUFFSIZE) + + self.stderr = io.BufferedReader(self._streamWrapper(self, + self._stderr, fderr), BUFFSIZE) + + self.stdin = io.BufferedWriter(self._streamWrapper(self, + self._stdin, self._fdin), BUFFSIZE) + + self._returncode = None + + self.blocking = False + + def _processStreams(self): + if len(self._closedfds) == 3: + return + + if not self._streamLock.acquire(False): + self._streamLock.acquire() + self._streamLock.release() + return + try: + if self._stdin.len > 0 and self._stdin.pos == 0: + # Polling stdin is redundant if there is nothing to write + # turn on only if data is waiting to be pushed + self._poller.modify(self._fdin, select.EPOLLOUT) + + pollres = NoIntrPoll(self._poller.poll, 1) + + for fd, event in pollres: + stream = self._fdMap[fd] + if event & select.EPOLLOUT and self._stdin.len > 0: + buff = self._stdin.read(BUFFSIZE) + written = os.write(fd, buff) + stream.pos -= len(buff) - written + if stream.pos == stream.len: + stream.truncate(0) + self._poller.modify(fd, 0) + + elif event & (select.EPOLLIN | select.EPOLLPRI): + data = os.read(fd, BUFFSIZE) + oldpos = stream.pos + stream.pos = stream.len + stream.write(data) + stream.pos = oldpos + + elif event & (select.EPOLLHUP | select.EPOLLERR): + self._poller.unregister(fd) + self._closedfds.append(fd) + # I don't close the fd because the original Popen + # will do it. + + if self.stdin.closed and self._fdin not in self._closedfds: + self._poller.unregister(self._fdin) + self._closedfds.append(self._fdin) + self._proc.stdin.close() + + finally: + self._streamLock.release() + + @property + def pid(self): + return self._proc.pid + + @property + def returncode(self): + if self._returncode is None: + self._returncode = self._proc.poll() + return self._returncode + + def kill(self): + try: + self._proc.kill() + except OSError as ex: + if ex.errno != errno.EPERM: + raise + execCmd([killCmdPath.cmd, "-%d" % (signal.SIGTERM,), + str(self.pid)], sudo=True) + + def wait(self, timeout=None, cond=None): + startTime = time.time() + while self.returncode is None: + if timeout is not None and (time.time() - startTime) > timeout: + return False + if cond is not None and cond(): + return False + self._processStreams() + return True + + def communicate(self, data=None): + if data is not None: + self.stdin.write(data) + self.stdin.flush() + self.stdin.close() + + self.wait() + return "".join(self.stdout), "".join(self.stderr) + + def __del__(self): + self._poller.close() + + +def execCmd(command, sudo=False, cwd=None, data=None, raw=False, logErr=True, + printable=None, env=None, sync=True, nice=None, ioclass=None, + ioclassdata=None, setsid=False, execCmdLogger=logging.root, + deathSignal=0, childUmask=None): + """ + Executes an external command, optionally via sudo. + + IMPORTANT NOTE: the new process would receive `deathSignal` when the + controlling thread dies, which may not be what you intended: if you create + a temporary thread, spawn a sync=False sub-process, and have the thread + finish, the new subprocess would die immediately. + """ + if ioclass is not None: + cmd = command + command = [ioniceCmdPath.cmd, '-c', str(ioclass)] + if ioclassdata is not None: + command.extend(("-n", str(ioclassdata))) + + command = command + cmd + + if nice is not None: + command = [niceCmdPath.cmd, '-n', str(nice)] + command + + if setsid: + command = [setsidCmdPath.cmd] + command + + if sudo: + command = [sudoCmdPath.cmd, SUDO_NON_INTERACTIVE_FLAG] + command + + if not printable: + printable = command + + cmdline = repr(subprocess.list2cmdline(printable)) + execCmdLogger.debug("%s (cwd %s)", cmdline, cwd) + + p = CPopen(command, close_fds=True, cwd=cwd, env=env, + deathSignal=deathSignal, childUmask=childUmask) + p = AsyncProc(p) + if not sync: + if data is not None: + p.stdin.write(data) + p.stdin.flush() + + return p + + (out, err) = p.communicate(data) + + if out is None: + # Prevent splitlines() from barfing later on + out = "" + + execCmdLogger.debug("%s: = %s; = %d", + {True: "SUCCESS", False: "FAILED"}[p.returncode == 0], + repr(err), p.returncode) + + if not raw: + out = out.splitlines(False) + err = err.splitlines(False) + + return (p.returncode, out, err) + + +def retry(func, expectedException=Exception, tries=None, + timeout=None, sleep=1, stopCallback=None): + """ + Retry a function. Wraps the retry logic so you don't have to + implement it each time you need it. + + :param func: The callable to run. + :param expectedException: The exception you expect to receive when the + function fails. + :param tries: The number of times to try. None\0,-1 means infinite. + :param timeout: The time you want to spend waiting. This **WILL NOT** stop + the method. It will just not run it if it ended after the + timeout. + :param sleep: Time to sleep between calls in seconds. + :param stopCallback: A function that takes no parameters and causes the + method to stop retrying when it returns with a + positive value. + """ + if tries in [0, None]: + tries = -1 + + if timeout in [0, None]: + timeout = -1 + + startTime = time.time() + + while True: + tries -= 1 + try: + return func() + except expectedException: + if tries == 0: + raise + + if (timeout > 0) and ((time.time() - startTime) > timeout): + raise + + if stopCallback is not None and stopCallback(): + raise + + time.sleep(sleep) -- cgit