From e40556bdf8fd27e35878a8c1942573da46204b7e Mon Sep 17 00:00:00 2001 From: Shireesh Anjal Date: Fri, 25 Mar 2011 15:43:32 +0530 Subject: Server agent and transport agent --- .../src/server/RemoteExecute.py | 287 +++++++++++++++++++++ 1 file changed, 287 insertions(+) create mode 100644 com.gluster.storage.management.server.scripts/src/server/RemoteExecute.py (limited to 'com.gluster.storage.management.server.scripts/src/server/RemoteExecute.py') diff --git a/com.gluster.storage.management.server.scripts/src/server/RemoteExecute.py b/com.gluster.storage.management.server.scripts/src/server/RemoteExecute.py new file mode 100644 index 00000000..1800234f --- /dev/null +++ b/com.gluster.storage.management.server.scripts/src/server/RemoteExecute.py @@ -0,0 +1,287 @@ +# Copyright (C) 2010 Gluster, Inc. +# This file is part of Gluster Storage Platform. +# +# Gluster Storage Platform is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 of +# the License, or (at your option) any later version. +# +# Gluster Storage Platform is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty +# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see +# . + +import os +import socket +#import paramiko +import syslog +import sys +import Socket +import Globals +from copy import deepcopy +from ServerUtils import * + +SERVER_AGENT_COMMAND = "/usr/sbin/server-agent" +SERVER_AGENT_CLEANUP_COMMAND = SERVER_AGENT_COMMAND + " --cleanup" +SERVER_AGENT_PRE_COMMAND = SERVER_AGENT_COMMAND + " --pre" +SERVER_AGENT_POST_COMMAND = SERVER_AGENT_COMMAND + " --post" +TRANSPORT_USER_NAME = "transport" +TRANSPORT_PRIVATE_KEY_FILE = Globals.TRANSPORT_HOME_DIR + "/.ssh/id_rsa" + +def remoteExecute(serverList, command, commandInput=None): + print "REMOTE:", serverList + statusDict = {} + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + try: + privateKey = paramiko.RSAKey.from_private_key_file(TRANSPORT_PRIVATE_KEY_FILE) + except IOError: + log(syslog.LOG_ERR, "Private key file %s not found" % TRANSPORT_PRIVATE_KEY_FILE) + return None + print "STAGE1" + for serverName in serverList.keys(): + serverStatus = {} + serverStatus["ConnectionStatus"] = None + serverStatus["ExecutionStatus"] = None + serverStatus["StdOutString"] = None + serverStatus["StdErrString"] = None + serverStatus["ConnectedIp"] = None + serverStatus["Error"] = None + + isConnected = False + for serverIp in serverList[serverName]: + try: + ssh.connect(serverIp, username=TRANSPORT_USER_NAME, pkey=privateKey) + isConnected = True + break + except socket.error: + log(syslog.LOG_ERR, "Server %s:%s is inaccessible" % (serverName, serverIp)) + continue + if not isConnected: + serverStatus["ConnectionStatus"] = "inaccessible" + statusDict[serverName] = serverStatus + continue + + try: + transport = ssh.get_transport() + channel = transport.open_session() + serverStatus["ConnectionStatus"] = True + channel.exec_command(command) + stdin = channel.makefile('wb') + stdout = channel.makefile('rb') + stderr = channel.makefile_stderr('rb') + if commandInput: + stdin.write(commandInput) + channel.shutdown_write() + + returnValue = channel.recv_exit_status() ## this is blocking call + serverStatus["ExecutionStatus"] = returnValue + print "RRRRRRRRRRRRRRRR:", returnValue + errorString = "" + if -1 == returnValue: + errorString = stderr.read() + serverStatus["StdErrString"] = errorString + if "bash: " + command.split()[0] + ": command not found\n" == errorString: + log(syslog.LOG_ERR, "command %s not found in server %s" % (command, serverName)) + serverStatus["Error"] = "Command not found" + else: + serverStatus["StdErrString"] = stderr.read() + serverStatus["StdOutString"] = stdout.read() + ssh.close() + except paramiko.SSHException: + # Channel error (channel not open) + log(syslog.LOG_ERR, "Server %s:%s connection aborted" % (serverName, serverIp)) + serverStatus["ConnectionStatus"] = "aborted" + except socket.error: + log(syslog.LOG_ERR, "Server %s:%s is inaccessible" % (serverName, serverIp)) + serverStatus["ConnectionStatus"] = "inaccessible" + except paramiko.AuthenticationException: + log(syslog.LOG_ERR, "Authentication error on server %s:%s of user %s" % + (serverName, serverIp, TRANSPORT_USER_NAME)) + serverStatus["ConnectionStatus"] = "authentication error" + serverStatus["ConnectedIp"] = serverIp + statusDict[serverName] = serverStatus + return statusDict + +def cleanupExecuteSsh(serverList, requestDom): + return remoteExecute(serverList, SERVER_AGENT_CLEANUP_COMMAND, requestDom.toxml()) + +def executeRequestCommandSsh(serverList, command, requestDom, cleanupFlag): + cleanupStatusDict = {} + successStatusDict = {} + failureServerList = {} + cleanupServerList = {} + serverList = deepcopy(serverList) + statusDict = remoteExecute(serverList, command, requestDom.toxml()) + for serverName in statusDict.keys(): + statusDict["Response"] = None + if statusDict[serverName]["ConnectionStatus"] == True: + setLastAccessedNetwork(serverName, statusDict[serverName]["ConnectedIp"]) + if statusDict[serverName]["ConnectedIp"]: + ipList = serverList[serverName] + ipList.remove(statusDict[serverName]["ConnectedIp"]) + cleanupServerList[serverName] = [statusDict[serverName]["ConnectedIp"]] + ipList + if statusDict[serverName]["ExecutionStatus"] != 0: + failureServerList[serverName] = statusDict[serverName] + continue + responseDom = XDOM() + if not responseDom.parseString(statusDict[serverName]["StdOutString"]): + failureServerList[serverName] = statusDict[serverName] + continue + statusDict["Response"] = responseDom + if "OK" != responseDom.getAttribute("response-code"): + failureServerList[serverName] = statusDict[serverName] + continue + successStatusDict[serverName] = statusDict[serverName] + if cleanupFlag and failureServerList: + cleanupStatusDict = remoteExecute(cleanupServerList, SERVER_AGENT_CLEANUP_COMMAND, requestDom.toxml()) + return successStatusDict, failureServerList, cleanupStatusDict + +def preExecuteSsh(serverList, requestDom, cleanupFlag=True): + return executeRequestCommandSsh(serverList, SERVER_AGENT_PRE_COMMAND, requestDom, cleanupFlag) + +def executeSsh(serverList, requestDom, cleanupFlag=True): + return executeRequestCommandSsh(serverList, SERVER_AGENT_COMMAND, requestDom, cleanupFlag) + +def postExecuteSsh(serverList, requestDom, cleanupFlag=True): + return executeRequestCommandSsh(serverList, SERVER_AGENT_POST_COMMAND, requestDom, cleanupFlag) + +def runPullUpdatesDir(sourceServerIp, destServerIpList): + command = "/usr/sbin/pull-dir.sh %s %s %s" % (sourceServerIp, + Globals.UPDATES_DIR[1:], + Globals.UPDATES_DIR) + statusDict = remoteExecute(destServerIpList, command) + status = True + for serverName in statusDict.keys(): + if statusDict[serverName]["ExecutionStatus"] != 0: + log(syslog.LOG_ERR, "Failed to execute [%s] in server %s" % (command, serverName)) + status = False + return status + +def runPullGlusterDir(sourceServerIp, destServerIpList): + command = "/usr/sbin/pull-dir.sh %s %s %s" % (sourceServerIp, + Globals.GLUSTER_BASE_DIR[1:], + Globals.GLUSTER_BASE_DIR) + statusDict = remoteExecute(destServerIpList, command) + status = True + for serverName in statusDict.keys(): + if statusDict[serverName]["ExecutionStatus"] != 0: + log(syslog.LOG_ERR, "Failed to execute [%s] in server %s" % (command, serverName)) + status = False + return status + +def syncConfiguration(syncToInstaller=False, sourceServerIpList=None): + thisServerName = getCurrentServerName() + serverList = getAllServerList() + serverList.remove(thisServerName) + serverIpList = getExecuteServerList(serverList) + if syncToInstaller: + installerIp = getInstallerIp() + if not installerIp: + log(syslog.LOG_ERR, "Installer IP address is not found") + return False + serverIpList[Globals.INSTALLER_SERVER_NAME] = [installerIp] + + if not serverIpList: + log(syslog.LOG_ERR, "No servers found for sync configuration") + return False + + signature = generateSignature() + if not storeSignature(signature, Globals.SIGNATURE_FILE): + log(syslog.LOG_ERR, "failed to store signature %s to %s file" % + (signature, Globals.SIGNATURE_FILE)) + return False + + thisServerIpList = getExecuteServerList([thisServerName]) + if sourceServerIpList: + thisServerIpList = sourceServerIpList + return runPullGlusterDir(thisServerIpList[thisServerName][0], serverIpList) + +def remoteExecuteTcp(serverIpList, requestString): + serverStatus = {} + serverStatus["ConnectionStatus"] = False + serverStatus["ExecutionStatus"] = -1 + serverStatus["StdOutString"] = None + serverStatus["StdErrString"] = None + serverStatus["ConnectedIp"] = None + serverStatus["Error"] = None + + for ipAddress in serverIpList.values()[0]: + try: + sock, inputStream, outputStream = Socket.connectToServer(ipAddress) + Socket.writePacket(outputStream, requestString) + packetString = Socket.readPacket(inputStream) + log('__DEBUG__ Received: %s' % repr(packetString)) + sock.close() + serverStatus["ConnectionStatus"] = True + serverStatus["ExecutionStatus"] = 0 + serverStatus["StdOutString"] = packetString + serverStatus["StdErrString"] = None + serverStatus["ConnectedIp"] = ipAddress + serverStatus["Error"] = None + return serverStatus + except socket.error, e: + log("socket error on [%s:%s]: %s" % (serverIpList.keys()[0], ipAddress, str(e))) + return serverStatus + +def executeRequestCommand(serverList, command, requestDom, cleanupFlag): + cleanupStatusDict = {} + successStatusDict = {} + failureServerList = {} + cleanupServerList = {} + serverList = deepcopy(serverList) + + statusDict = {} + for serverName in serverList.keys(): + serverStatus = remoteExecuteTcp({serverName : serverList[serverName]}, requestDom.toxml()) + statusDict[serverName] = serverStatus + for serverName in statusDict.keys(): + statusDict["Response"] = None + if statusDict[serverName]["ConnectionStatus"] == True: + setLastAccessedNetwork(serverName, statusDict[serverName]["ConnectedIp"]) + if statusDict[serverName]["ConnectedIp"]: + ipList = serverList[serverName] + ipList.remove(statusDict[serverName]["ConnectedIp"]) + cleanupServerList[serverName] = [statusDict[serverName]["ConnectedIp"]] + ipList + if statusDict[serverName]["ExecutionStatus"] != 0: + failureServerList[serverName] = statusDict[serverName] + continue + responseDom = XDOM() + if not responseDom.parseString(statusDict[serverName]["StdOutString"]): + failureServerList[serverName] = statusDict[serverName] + continue + statusDict["Response"] = responseDom + if "OK" != responseDom.getResponseCode(): + failureServerList[serverName] = statusDict[serverName] + continue + successStatusDict[serverName] = statusDict[serverName] + if cleanupFlag and failureServerList: + rq = deepcopy(requestDom) + rq.setRequestAction("cleanup") + cleanupStatusDict = {} + for serverName in cleanupServerList.keys(): + serverStatus = remoteExecuteTcp({serverName : cleanupServerList[serverName]}, rq.toxml()) + cleanupStatusDict[serverName] = serverStatus + return successStatusDict, failureServerList, cleanupStatusDict + +def preExecute(serverList, requestDom, cleanupFlag=True): + rq = deepcopy(requestDom) + rq.setRequestAction("pre") + return executeRequestCommand(serverList, SERVER_AGENT_PRE_COMMAND, rq, cleanupFlag) + +def execute(serverList, requestDom, cleanupFlag=True): + return executeRequestCommand(serverList, SERVER_AGENT_COMMAND, requestDom, cleanupFlag) + +def postExecute(serverList, requestDom, cleanupFlag=True): + rq = deepcopy(requestDom) + rq.setRequestAction("post") + return executeRequestCommand(serverList, SERVER_AGENT_POST_COMMAND, rq, cleanupFlag) + +def cleanupExecute(serverList, requestDom): + rq = deepcopy(requestDom) + rq.setRequestAction("cleanup") + return executeRequestCommand(serverList, SERVER_AGENT_CLEANUP_COMMAND, rq, False) -- cgit