diff options
Diffstat (limited to 'com.gluster.storage.management.server.scripts/src/server')
5 files changed, 548 insertions, 0 deletions
diff --git a/com.gluster.storage.management.server.scripts/src/server/RemoteExecute.py b/com.gluster.storage.management.server.scripts/src/server/RemoteExecute.py new file mode 100644 index 00000000..1800234f --- /dev/null +++ b/com.gluster.storage.management.server.scripts/src/server/RemoteExecute.py @@ -0,0 +1,287 @@ +# Copyright (C) 2010 Gluster, Inc. <http://www.gluster.com> +# This file is part of Gluster Storage Platform. +# +# Gluster Storage Platform is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 of +# the License, or (at your option) any later version. +# +# Gluster Storage Platform is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty +# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see +# <http://www.gnu.org/licenses/>. + +import os +import socket +#import paramiko +import syslog +import sys +import Socket +import Globals +from copy import deepcopy +from ServerUtils import * + +SERVER_AGENT_COMMAND = "/usr/sbin/server-agent" +SERVER_AGENT_CLEANUP_COMMAND = SERVER_AGENT_COMMAND + " --cleanup" +SERVER_AGENT_PRE_COMMAND = SERVER_AGENT_COMMAND + " --pre" +SERVER_AGENT_POST_COMMAND = SERVER_AGENT_COMMAND + " --post" +TRANSPORT_USER_NAME = "transport" +TRANSPORT_PRIVATE_KEY_FILE = Globals.TRANSPORT_HOME_DIR + "/.ssh/id_rsa" + +def remoteExecute(serverList, command, commandInput=None): + print "REMOTE:", serverList + statusDict = {} + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + try: + privateKey = paramiko.RSAKey.from_private_key_file(TRANSPORT_PRIVATE_KEY_FILE) + except IOError: + log(syslog.LOG_ERR, "Private key file %s not found" % TRANSPORT_PRIVATE_KEY_FILE) + return None + print "STAGE1" + for serverName in serverList.keys(): + serverStatus = {} + serverStatus["ConnectionStatus"] = None + serverStatus["ExecutionStatus"] = None + serverStatus["StdOutString"] = None + serverStatus["StdErrString"] = None + serverStatus["ConnectedIp"] = None + serverStatus["Error"] = None + + isConnected = False + for serverIp in serverList[serverName]: + try: + ssh.connect(serverIp, username=TRANSPORT_USER_NAME, pkey=privateKey) + isConnected = True + break + except socket.error: + log(syslog.LOG_ERR, "Server %s:%s is inaccessible" % (serverName, serverIp)) + continue + if not isConnected: + serverStatus["ConnectionStatus"] = "inaccessible" + statusDict[serverName] = serverStatus + continue + + try: + transport = ssh.get_transport() + channel = transport.open_session() + serverStatus["ConnectionStatus"] = True + channel.exec_command(command) + stdin = channel.makefile('wb') + stdout = channel.makefile('rb') + stderr = channel.makefile_stderr('rb') + if commandInput: + stdin.write(commandInput) + channel.shutdown_write() + + returnValue = channel.recv_exit_status() ## this is blocking call + serverStatus["ExecutionStatus"] = returnValue + print "RRRRRRRRRRRRRRRR:", returnValue + errorString = "" + if -1 == returnValue: + errorString = stderr.read() + serverStatus["StdErrString"] = errorString + if "bash: " + command.split()[0] + ": command not found\n" == errorString: + log(syslog.LOG_ERR, "command %s not found in server %s" % (command, serverName)) + serverStatus["Error"] = "Command not found" + else: + serverStatus["StdErrString"] = stderr.read() + serverStatus["StdOutString"] = stdout.read() + ssh.close() + except paramiko.SSHException: + # Channel error (channel not open) + log(syslog.LOG_ERR, "Server %s:%s connection aborted" % (serverName, serverIp)) + serverStatus["ConnectionStatus"] = "aborted" + except socket.error: + log(syslog.LOG_ERR, "Server %s:%s is inaccessible" % (serverName, serverIp)) + serverStatus["ConnectionStatus"] = "inaccessible" + except paramiko.AuthenticationException: + log(syslog.LOG_ERR, "Authentication error on server %s:%s of user %s" % + (serverName, serverIp, TRANSPORT_USER_NAME)) + serverStatus["ConnectionStatus"] = "authentication error" + serverStatus["ConnectedIp"] = serverIp + statusDict[serverName] = serverStatus + return statusDict + +def cleanupExecuteSsh(serverList, requestDom): + return remoteExecute(serverList, SERVER_AGENT_CLEANUP_COMMAND, requestDom.toxml()) + +def executeRequestCommandSsh(serverList, command, requestDom, cleanupFlag): + cleanupStatusDict = {} + successStatusDict = {} + failureServerList = {} + cleanupServerList = {} + serverList = deepcopy(serverList) + statusDict = remoteExecute(serverList, command, requestDom.toxml()) + for serverName in statusDict.keys(): + statusDict["Response"] = None + if statusDict[serverName]["ConnectionStatus"] == True: + setLastAccessedNetwork(serverName, statusDict[serverName]["ConnectedIp"]) + if statusDict[serverName]["ConnectedIp"]: + ipList = serverList[serverName] + ipList.remove(statusDict[serverName]["ConnectedIp"]) + cleanupServerList[serverName] = [statusDict[serverName]["ConnectedIp"]] + ipList + if statusDict[serverName]["ExecutionStatus"] != 0: + failureServerList[serverName] = statusDict[serverName] + continue + responseDom = XDOM() + if not responseDom.parseString(statusDict[serverName]["StdOutString"]): + failureServerList[serverName] = statusDict[serverName] + continue + statusDict["Response"] = responseDom + if "OK" != responseDom.getAttribute("response-code"): + failureServerList[serverName] = statusDict[serverName] + continue + successStatusDict[serverName] = statusDict[serverName] + if cleanupFlag and failureServerList: + cleanupStatusDict = remoteExecute(cleanupServerList, SERVER_AGENT_CLEANUP_COMMAND, requestDom.toxml()) + return successStatusDict, failureServerList, cleanupStatusDict + +def preExecuteSsh(serverList, requestDom, cleanupFlag=True): + return executeRequestCommandSsh(serverList, SERVER_AGENT_PRE_COMMAND, requestDom, cleanupFlag) + +def executeSsh(serverList, requestDom, cleanupFlag=True): + return executeRequestCommandSsh(serverList, SERVER_AGENT_COMMAND, requestDom, cleanupFlag) + +def postExecuteSsh(serverList, requestDom, cleanupFlag=True): + return executeRequestCommandSsh(serverList, SERVER_AGENT_POST_COMMAND, requestDom, cleanupFlag) + +def runPullUpdatesDir(sourceServerIp, destServerIpList): + command = "/usr/sbin/pull-dir.sh %s %s %s" % (sourceServerIp, + Globals.UPDATES_DIR[1:], + Globals.UPDATES_DIR) + statusDict = remoteExecute(destServerIpList, command) + status = True + for serverName in statusDict.keys(): + if statusDict[serverName]["ExecutionStatus"] != 0: + log(syslog.LOG_ERR, "Failed to execute [%s] in server %s" % (command, serverName)) + status = False + return status + +def runPullGlusterDir(sourceServerIp, destServerIpList): + command = "/usr/sbin/pull-dir.sh %s %s %s" % (sourceServerIp, + Globals.GLUSTER_BASE_DIR[1:], + Globals.GLUSTER_BASE_DIR) + statusDict = remoteExecute(destServerIpList, command) + status = True + for serverName in statusDict.keys(): + if statusDict[serverName]["ExecutionStatus"] != 0: + log(syslog.LOG_ERR, "Failed to execute [%s] in server %s" % (command, serverName)) + status = False + return status + +def syncConfiguration(syncToInstaller=False, sourceServerIpList=None): + thisServerName = getCurrentServerName() + serverList = getAllServerList() + serverList.remove(thisServerName) + serverIpList = getExecuteServerList(serverList) + if syncToInstaller: + installerIp = getInstallerIp() + if not installerIp: + log(syslog.LOG_ERR, "Installer IP address is not found") + return False + serverIpList[Globals.INSTALLER_SERVER_NAME] = [installerIp] + + if not serverIpList: + log(syslog.LOG_ERR, "No servers found for sync configuration") + return False + + signature = generateSignature() + if not storeSignature(signature, Globals.SIGNATURE_FILE): + log(syslog.LOG_ERR, "failed to store signature %s to %s file" % + (signature, Globals.SIGNATURE_FILE)) + return False + + thisServerIpList = getExecuteServerList([thisServerName]) + if sourceServerIpList: + thisServerIpList = sourceServerIpList + return runPullGlusterDir(thisServerIpList[thisServerName][0], serverIpList) + +def remoteExecuteTcp(serverIpList, requestString): + serverStatus = {} + serverStatus["ConnectionStatus"] = False + serverStatus["ExecutionStatus"] = -1 + serverStatus["StdOutString"] = None + serverStatus["StdErrString"] = None + serverStatus["ConnectedIp"] = None + serverStatus["Error"] = None + + for ipAddress in serverIpList.values()[0]: + try: + sock, inputStream, outputStream = Socket.connectToServer(ipAddress) + Socket.writePacket(outputStream, requestString) + packetString = Socket.readPacket(inputStream) + log('__DEBUG__ Received: %s' % repr(packetString)) + sock.close() + serverStatus["ConnectionStatus"] = True + serverStatus["ExecutionStatus"] = 0 + serverStatus["StdOutString"] = packetString + serverStatus["StdErrString"] = None + serverStatus["ConnectedIp"] = ipAddress + serverStatus["Error"] = None + return serverStatus + except socket.error, e: + log("socket error on [%s:%s]: %s" % (serverIpList.keys()[0], ipAddress, str(e))) + return serverStatus + +def executeRequestCommand(serverList, command, requestDom, cleanupFlag): + cleanupStatusDict = {} + successStatusDict = {} + failureServerList = {} + cleanupServerList = {} + serverList = deepcopy(serverList) + + statusDict = {} + for serverName in serverList.keys(): + serverStatus = remoteExecuteTcp({serverName : serverList[serverName]}, requestDom.toxml()) + statusDict[serverName] = serverStatus + for serverName in statusDict.keys(): + statusDict["Response"] = None + if statusDict[serverName]["ConnectionStatus"] == True: + setLastAccessedNetwork(serverName, statusDict[serverName]["ConnectedIp"]) + if statusDict[serverName]["ConnectedIp"]: + ipList = serverList[serverName] + ipList.remove(statusDict[serverName]["ConnectedIp"]) + cleanupServerList[serverName] = [statusDict[serverName]["ConnectedIp"]] + ipList + if statusDict[serverName]["ExecutionStatus"] != 0: + failureServerList[serverName] = statusDict[serverName] + continue + responseDom = XDOM() + if not responseDom.parseString(statusDict[serverName]["StdOutString"]): + failureServerList[serverName] = statusDict[serverName] + continue + statusDict["Response"] = responseDom + if "OK" != responseDom.getResponseCode(): + failureServerList[serverName] = statusDict[serverName] + continue + successStatusDict[serverName] = statusDict[serverName] + if cleanupFlag and failureServerList: + rq = deepcopy(requestDom) + rq.setRequestAction("cleanup") + cleanupStatusDict = {} + for serverName in cleanupServerList.keys(): + serverStatus = remoteExecuteTcp({serverName : cleanupServerList[serverName]}, rq.toxml()) + cleanupStatusDict[serverName] = serverStatus + return successStatusDict, failureServerList, cleanupStatusDict + +def preExecute(serverList, requestDom, cleanupFlag=True): + rq = deepcopy(requestDom) + rq.setRequestAction("pre") + return executeRequestCommand(serverList, SERVER_AGENT_PRE_COMMAND, rq, cleanupFlag) + +def execute(serverList, requestDom, cleanupFlag=True): + return executeRequestCommand(serverList, SERVER_AGENT_COMMAND, requestDom, cleanupFlag) + +def postExecute(serverList, requestDom, cleanupFlag=True): + rq = deepcopy(requestDom) + rq.setRequestAction("post") + return executeRequestCommand(serverList, SERVER_AGENT_POST_COMMAND, rq, cleanupFlag) + +def cleanupExecute(serverList, requestDom): + rq = deepcopy(requestDom) + rq.setRequestAction("cleanup") + return executeRequestCommand(serverList, SERVER_AGENT_CLEANUP_COMMAND, rq, False) diff --git a/com.gluster.storage.management.server.scripts/src/server/RequestHandler.py b/com.gluster.storage.management.server.scripts/src/server/RequestHandler.py new file mode 100644 index 00000000..e6fe88ff --- /dev/null +++ b/com.gluster.storage.management.server.scripts/src/server/RequestHandler.py @@ -0,0 +1,58 @@ +import os +import glob +#import paramiko +import tempfile +#import uuid +import socket +import tarfile +import time +import Globals +import Commands +from Protocol import * +from RemoteExecute import * +from NetworkUtils import * + +def handleRequestGetServerNetworkConfig(requestDom): + messageId = requestDom.getAttribute("id") + serverName = requestDom.getTextByTagRoute("command.server-name") + version = requestDom.getVersion() + request = requestDom.getAttribute("request") + + if not serverName: + responseDom = ResponseXml(Commands.COMMAND_GET_SERVER_NETWORK_CONFIG, "No server name given", messageId, version) + responseDom.appendTagRoute("server.name", serverName) + return responseDom + + #serverIpList = getExecuteServerList([serverName]) + #if not serverIpList: + # responseDom = ResponseXml(Commands.COMMAND_GET_SERVER_NETWORK_CONFIG, "Unable to get server ip", messageId, version) + # responseDom.appendTagRoute("server.name", serverName) + # return responseDom + + successStatusDict, failureServerList, cleanupStatusDict = \ + execute({serverName:[serverName]}, requestDom, Globals.REQUEST_MAP[request]["cleanup"]) + if failureServerList: + response = failureServerList[serverName]["StdOutString"] + if not response: + return ResponseXml(Commands.COMMAND_GET_SERVER_NETWORK_CONFIG, + "Failed to execute get server network config", messageId, version) + responseDom = XDOM() + if responseDom.parseString(response): + return responseDom + errorResponseDom = ResponseXml(Commands.COMMAND_GET_SERVER_NETWORK_CONFIG, + "Invalid response of get server network config", messageId, version) + errorResponseDom.appendTagRoute("server.name", serverName) + return errorResponseDom + + responseDom = XDOM() + if not responseDom.parseString(successStatusDict[serverName]["StdOutString"]): + errorResponseDom = ResponseXml(Commands.COMMAND_GET_SERVER_NETWORK_CONFIG, + "Invalid response of get server network config", messageId, version) + errorResponseDom.appendTagRoute("server.name", serverName) + return errorResponseDom + + #configDom = getServerNetworkConfigFromLocalFile(serverName) + #if not (configDom and compareServerNetworkDom(configDom, responseDom)): + # updateServerNetworkConfigXmlFile(serverName, responseDom) + # syncConfiguration() + return responseDom diff --git a/com.gluster.storage.management.server.scripts/src/server/TransportAgent.py b/com.gluster.storage.management.server.scripts/src/server/TransportAgent.py new file mode 100644 index 00000000..5f39b585 --- /dev/null +++ b/com.gluster.storage.management.server.scripts/src/server/TransportAgent.py @@ -0,0 +1,26 @@ +import Commands +#from Common import log +from Protocol import * +from RequestHandler import * + +def processRequest(requestDom): + Globals.REQUEST_MAP = { + Commands.COMMAND_GET_SERVER_NETWORK_CONFIG : {"handle":handleRequestGetServerNetworkConfig, + "pre-run":False, "run":True, "post-run":False, \ + "cleanup":False, "sync-config":False, "safemode":False}} + + messageId = requestDom.getMessageId() + if not messageId: + log("invalid message from web agent") + return None + + requestCommand = requestDom.getRequestCommand() + if not requestCommand: + log("invalid request from web agent") + return None + + try: + requestCommand = Globals.REQUEST_MAP[requestCommand]['handle'] + except KeyError: # Handler not found! + return ResponseXml(requestCommand, "Invalid command", messageId, version) + return requestCommand(requestDom) diff --git a/com.gluster.storage.management.server.scripts/src/server/transport.py b/com.gluster.storage.management.server.scripts/src/server/transport.py new file mode 100755 index 00000000..9255ff40 --- /dev/null +++ b/com.gluster.storage.management.server.scripts/src/server/transport.py @@ -0,0 +1,94 @@ +#!/usr/bin/python +# Copyright (C) 2009 Gluster, Inc. <http://www.gluster.com> +# This file is part of Gluster Storage Platform. +# +# Gluster Storage Platform is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 of +# the License, or (at your option) any later version. +# +# Gluster Storage Platform is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty +# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see +# <http://www.gnu.org/licenses/>. + +import sys +import syslog +import signal +import datetime +from Globals import * +from Protocol import * +from TransportAgent import * +from optparse import OptionParser + +class TimeoutException(Exception): + pass + +def timeoutSignal(signum, frame): + raise TimeoutException, "Timed out" + +def main(): + openLog(Globals.TRANSPORT_AGENT_LOG_FILE) + parser = OptionParser(version="%transport " + Globals.GLUSTER_PLATFORM_VERSION) + parser.add_option("-f", "--force", + action="store_true", dest="force", default=False, + help="Execute command forcefully") + + parser.add_option("-t", "--timeout", + type="int", nargs=1, dest="timeout", + help="Session time-out") + + parser.add_option("--debug", + action="store_true", dest="debug", default=False, + help="Enable debug mode") + (options, args) = parser.parse_args() + Globals.GLUSTER_DEBUG = options.debug + + if len(args) != 1: + #print "usage: Transport.py [-f | --force] [-t N | --timeout=N] [--debug] <input-file>" + log(syslog.LOG_ERR, "invalid arguments") + sys.exit(-1) + + signal.signal(signal.SIGALRM, timeoutSignal) + signal.alarm(options.timeout) + inputFile = args[0] + #outputFile = args[1] + try: + requestString = open(inputFile).read() + if not requestString: + sys.exit(-1) + fp = open("/tmp/transport.log", "a") + fp.write("\n%s: Send: %s\n" % (str(datetime.now()), requestString)) + fp.close() + except IOError: + log(syslog.LOG_ERR, "Unable to read input xml file %s" % inputFile) + sys.exit(-1) + + requestDom = RequestXml(requestString) + if not requestDom: + log(syslog.LOG_ERR, "error: invalid request: %s" % requestString) + sys.exit(-1) + + responseDom = processRequest(requestDom) + if not responseDom: + log(syslog.LOG_ERR, "command execution failed") + sys.exit(-1) + + #fp = open("/tmp/transport.log", "a") + #fp.write("%s: Receive: %s\n" % (str(datetime.now()), responseDom.toxml())) + #fp.close() + + #responseDom.writexml(outputFile) + print responseDom.toxml() + sys.exit(0) + +if __name__ == "__main__": + try: + main() + except TimeoutException: + log(syslog.LOG_ERR, "session timed out") + sys.exit(-1) diff --git a/com.gluster.storage.management.server.scripts/src/server/vmware-discover-servers.py b/com.gluster.storage.management.server.scripts/src/server/vmware-discover-servers.py new file mode 100755 index 00000000..6ac15fed --- /dev/null +++ b/com.gluster.storage.management.server.scripts/src/server/vmware-discover-servers.py @@ -0,0 +1,83 @@ +#!/usr/bin/python +# Copyright (C) 2009 Gluster, Inc. <http://www.gluster.com> +# This file is part of Gluster Storage Platform. +# +# Gluster Storage Platform is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 of +# the License, or (at your option) any later version. +# +# Gluster Storage Platform is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty +# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see +# <http://www.gnu.org/licenses/>. + +import sys +import socket +import signal +import struct +import syslog +import Globals +import Common + +class TimeoutException(Exception): + pass + +def timeoutSignal(signum, frame): + raise TimeoutException, "Timed out" + +def serverDiscoveryRequest(multiCastGroup, port): + servers = [] + # Sending request to all the servers + socketSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + socketSend.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) + socketSend.sendto("ServerDiscovery", (multiCastGroup, port)) + + # Waiting for the response + socketReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + socketReceive.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + socketReceive.bind(('', port)) + mreq = struct.pack("4sl", socket.inet_aton(multiCastGroup), socket.INADDR_ANY) + + socketReceive.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + socketSend.sendto("ServerDiscovery", (multiCastGroup, port)) + + try: + while True: + response = socketReceive.recvfrom(200) + if response and response[0].upper() != "SERVERDISCOVERY": + servers.append(response[0]) + signal.signal(signal.SIGALRM, timeoutSignal) + signal.alarm(3) + except TimeoutException: + return servers + return None + +def main(): + syslog.openlog("discovery server request") + servers = serverDiscoveryRequest(Globals.MULTICAST_GROUP, Globals.MULTICAST_PORT) + if not servers: + Common.log(syslog.LOG_ERR, "Failed to discover new servers") + sys.exit(-1) + + servers = set(servers) + try: + #fp = open(Globals.DISCOVERED_SERVER_LIST_FILENAME, "w") + #fp.writelines(list(servers)) + #fp.close() + for server in servers: + print server + except IOError: + Common.log(syslog.LOG_ERR, "Unable to open file %s" % Globals.DISCOVERED_SERVER_LIST_FILENAME) + sys.exit(-1) + + #for serverName in servers: + # print serverName + sys.exit(0) + +if __name__ == "__main__": + main() |
