# # Copyright 2014 Red Hat, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # Refer to the README and COPYING files for full details of the license # ## most of the code is copied from vdsm project import logging import subprocess from cpopen import CPopen import io import select import threading from StringIO import StringIO from weakref import proxy import time import os import errno import signal class CommandPath(object): def __init__(self, name, *args): self.name = name self.paths = args self._cmd = None @property def cmd(self): if not self._cmd: for path in self.paths: if os.path.exists(path): self._cmd = path break else: raise OSError(os.errno.ENOENT, os.strerror(os.errno.ENOENT) + ': ' + self.name) return self._cmd def __repr__(self): return str(self.cmd) def __str__(self): return str(self.cmd) def __unicode__(self): return unicode(self.cmd) ioniceCmdPath = CommandPath("ionice", "/bin/ionice", ) killCmdPath = CommandPath("kill", "/bin/kill", ) niceCmdPath = CommandPath("nice", "/bin/nice", ) setsidCmdPath = CommandPath("setsid", "/bin/setsid", ) sudoCmdPath = CommandPath("sudo", "/bin/sudo", ) # Buffsize is 1K because I tested it on some use cases and 1K was fastest. If # you find this number to be a bottleneck in any way you are welcome to change # it BUFFSIZE = 1024 SUDO_NON_INTERACTIVE_FLAG = "-n" # NOTE: it would be best to try and unify NoIntrCall and NoIntrPoll. # We could do so defining a new object that can be used as a placeholer # for the changing timeout value in the *args/**kwargs. This would # lead us to rebuilding the function arguments at each loop. def NoIntrPoll(pollfun, timeout=-1): """ This wrapper is used to handle the interrupt exceptions that might occur during a poll system call. The wrapped function must be defined as poll([timeout]) where the special timeout value 0 is used to return immediately and -1 is used to wait indefinitely. """ # When the timeout < 0 we shouldn't compute a new timeout after an # interruption. endtime = None if timeout < 0 else time.time() + timeout while True: try: return pollfun(timeout) except (IOError, select.error) as e: if e.args[0] != errno.EINTR: raise if endtime is not None: timeout = max(0, endtime - time.time()) class AsyncProc(object): """ AsyncProc is a funky class. It wraps a standard subprocess.Popen Object and gives it super powers. Like the power to read from a stream without the fear of deadlock. It does this by always sampling all stream while waiting for data. By doing this the other process can freely write data to all stream without the fear of it getting stuck writing to a full pipe. """ class _streamWrapper(io.RawIOBase): def __init__(self, parent, streamToWrap, fd): io.IOBase.__init__(self) self._stream = streamToWrap self._parent = proxy(parent) self._fd = fd self._closed = False def close(self): if not self._closed: self._closed = True while not self._streamClosed: self._parent._processStreams() @property def closed(self): return self._closed @property def _streamClosed(self): return (self.fileno() in self._parent._closedfds) def fileno(self): return self._fd def seekable(self): return False def readable(self): return True def writable(self): return True def _readNonBlock(self, length): hasNewData = (self._stream.len - self._stream.pos) if hasNewData < length and not self._streamClosed: self._parent._processStreams() with self._parent._streamLock: res = self._stream.read(length) if self._stream.pos == self._stream.len: self._stream.truncate(0) if res == "" and not self._streamClosed: return None else: return res def read(self, length): if not self._parent.blocking: return self._readNonBlock(length) else: res = None while res is None: res = self._readNonBlock(length) return res def readinto(self, b): data = self.read(len(b)) if data is None: return None bytesRead = len(data) b[:bytesRead] = data return bytesRead def write(self, data): if hasattr(data, "tobytes"): data = data.tobytes() with self._parent._streamLock: oldPos = self._stream.pos self._stream.pos = self._stream.len self._stream.write(data) self._stream.pos = oldPos while self._stream.len > 0 and not self._streamClosed: self._parent._processStreams() if self._streamClosed: self._closed = True if self._stream.len != 0: raise IOError(errno.EPIPE, "Could not write all data to stream") return len(data) def __init__(self, popenToWrap): self._streamLock = threading.Lock() self._proc = popenToWrap self._stdout = StringIO() self._stderr = StringIO() self._stdin = StringIO() fdout = self._proc.stdout.fileno() fderr = self._proc.stderr.fileno() self._fdin = self._proc.stdin.fileno() self._closedfds = [] self._poller = select.epoll() self._poller.register(fdout, select.EPOLLIN | select.EPOLLPRI) self._poller.register(fderr, select.EPOLLIN | select.EPOLLPRI) self._poller.register(self._fdin, 0) self._fdMap = {fdout: self._stdout, fderr: self._stderr, self._fdin: self._stdin} self.stdout = io.BufferedReader(self._streamWrapper(self, self._stdout, fdout), BUFFSIZE) self.stderr = io.BufferedReader(self._streamWrapper(self, self._stderr, fderr), BUFFSIZE) self.stdin = io.BufferedWriter(self._streamWrapper(self, self._stdin, self._fdin), BUFFSIZE) self._returncode = None self.blocking = False def _processStreams(self): if len(self._closedfds) == 3: return if not self._streamLock.acquire(False): self._streamLock.acquire() self._streamLock.release() return try: if self._stdin.len > 0 and self._stdin.pos == 0: # Polling stdin is redundant if there is nothing to write # turn on only if data is waiting to be pushed self._poller.modify(self._fdin, select.EPOLLOUT) pollres = NoIntrPoll(self._poller.poll, 1) for fd, event in pollres: stream = self._fdMap[fd] if event & select.EPOLLOUT and self._stdin.len > 0: buff = self._stdin.read(BUFFSIZE) written = os.write(fd, buff) stream.pos -= len(buff) - written if stream.pos == stream.len: stream.truncate(0) self._poller.modify(fd, 0) elif event & (select.EPOLLIN | select.EPOLLPRI): data = os.read(fd, BUFFSIZE) oldpos = stream.pos stream.pos = stream.len stream.write(data) stream.pos = oldpos elif event & (select.EPOLLHUP | select.EPOLLERR): self._poller.unregister(fd) self._closedfds.append(fd) # I don't close the fd because the original Popen # will do it. if self.stdin.closed and self._fdin not in self._closedfds: self._poller.unregister(self._fdin) self._closedfds.append(self._fdin) self._proc.stdin.close() finally: self._streamLock.release() @property def pid(self): return self._proc.pid @property def returncode(self): if self._returncode is None: self._returncode = self._proc.poll() return self._returncode def kill(self): try: self._proc.kill() except OSError as ex: if ex.errno != errno.EPERM: raise execCmd([killCmdPath.cmd, "-%d" % (signal.SIGTERM,), str(self.pid)], sudo=True) def wait(self, timeout=None, cond=None): startTime = time.time() while self.returncode is None: if timeout is not None and (time.time() - startTime) > timeout: return False if cond is not None and cond(): return False self._processStreams() return True def communicate(self, data=None): if data is not None: self.stdin.write(data) self.stdin.flush() self.stdin.close() self.wait() return "".join(self.stdout), "".join(self.stderr) def __del__(self): self._poller.close() def execCmd(command, sudo=False, cwd=None, data=None, raw=False, logErr=True, printable=None, env=None, sync=True, nice=None, ioclass=None, ioclassdata=None, setsid=False, execCmdLogger=logging.root, deathSignal=0, childUmask=None): """ Executes an external command, optionally via sudo. IMPORTANT NOTE: the new process would receive `deathSignal` when the controlling thread dies, which may not be what you intended: if you create a temporary thread, spawn a sync=False sub-process, and have the thread finish, the new subprocess would die immediately. """ if ioclass is not None: cmd = command command = [ioniceCmdPath.cmd, '-c', str(ioclass)] if ioclassdata is not None: command.extend(("-n", str(ioclassdata))) command = command + cmd if nice is not None: command = [niceCmdPath.cmd, '-n', str(nice)] + command if setsid: command = [setsidCmdPath.cmd] + command if sudo: command = [sudoCmdPath.cmd, SUDO_NON_INTERACTIVE_FLAG] + command if not printable: printable = command cmdline = repr(subprocess.list2cmdline(printable)) execCmdLogger.debug("%s (cwd %s)", cmdline, cwd) p = CPopen(command, close_fds=True, cwd=cwd, env=env, deathSignal=deathSignal, childUmask=childUmask) p = AsyncProc(p) if not sync: if data is not None: p.stdin.write(data) p.stdin.flush() return p (out, err) = p.communicate(data) if out is None: # Prevent splitlines() from barfing later on out = "" execCmdLogger.debug("%s: = %s; = %d", {True: "SUCCESS", False: "FAILED"}[p.returncode == 0], repr(err), p.returncode) if not raw: out = out.splitlines(False) err = err.splitlines(False) return (p.returncode, out, err) def retry(func, expectedException=Exception, tries=None, timeout=None, sleep=1, stopCallback=None): """ Retry a function. Wraps the retry logic so you don't have to implement it each time you need it. :param func: The callable to run. :param expectedException: The exception you expect to receive when the function fails. :param tries: The number of times to try. None\0,-1 means infinite. :param timeout: The time you want to spend waiting. This **WILL NOT** stop the method. It will just not run it if it ended after the timeout. :param sleep: Time to sleep between calls in seconds. :param stopCallback: A function that takes no parameters and causes the method to stop retrying when it returns with a positive value. """ if tries in [0, None]: tries = -1 if timeout in [0, None]: timeout = -1 startTime = time.time() while True: tries -= 1 try: return func() except expectedException: if tries == 0: raise if (timeout > 0) and ((time.time() - startTime) > timeout): raise if stopCallback is not None and stopCallback(): raise time.sleep(sleep)