diff options
Diffstat (limited to 'events/src')
| -rw-r--r-- | events/src/Makefile.am | 41 | ||||
| -rw-r--r-- | events/src/__init__.py | 10 | ||||
| -rw-r--r-- | events/src/eventsapiconf.py.in | 59 | ||||
| -rw-r--r-- | events/src/eventsconfig.json | 5 | ||||
| -rw-r--r-- | events/src/gf_event.py | 60 | ||||
| -rw-r--r-- | events/src/glustereventsd.py | 159 | ||||
| -rw-r--r-- | events/src/handlers.py | 40 | ||||
| -rw-r--r-- | events/src/peer_eventsapi.py | 669 | ||||
| -rw-r--r-- | events/src/utils.py | 445 |
9 files changed, 1488 insertions, 0 deletions
diff --git a/events/src/Makefile.am b/events/src/Makefile.am new file mode 100644 index 00000000000..3b229691897 --- /dev/null +++ b/events/src/Makefile.am @@ -0,0 +1,41 @@ +noinst_PYTHON = $(top_srcdir)/events/eventskeygen.py +EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in \ + handlers.py utils.py peer_eventsapi.py eventsconfig.json gf_event.py + +BUILT_SOURCES = eventtypes.py +CLEANFILES = eventtypes.py + +eventsdir = $(GLUSTERFS_LIBEXECDIR)/gfevents +if BUILD_EVENTS +events_PYTHON = __init__.py gf_event.py eventsapiconf.py eventtypes.py \ + utils.py +endif +# this does not work, see the Makefile.am in the root for a workaround +#nodist_events_PYTHON = eventtypes.py + +eventtypes.py: $(top_srcdir)/events/eventskeygen.py + $(PYTHON) $(top_srcdir)/events/eventskeygen.py PY_HEADER + +if BUILD_EVENTS +eventspeerscriptdir = $(GLUSTERFS_LIBEXECDIR) +eventsconfdir = $(sysconfdir)/glusterfs +eventsconf_DATA = eventsconfig.json + +events_PYTHON += handlers.py +events_SCRIPTS = glustereventsd.py +eventspeerscript_SCRIPTS = peer_eventsapi.py + +install-exec-hook: + $(mkdir_p) $(DESTDIR)$(sbindir) + rm -f $(DESTDIR)$(sbindir)/glustereventsd + ln -s $(GLUSTERFS_LIBEXECDIR)/gfevents/glustereventsd.py \ + $(DESTDIR)$(sbindir)/glustereventsd + rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi + ln -s $(GLUSTERFS_LIBEXECDIR)/peer_eventsapi.py \ + $(DESTDIR)$(sbindir)/gluster-eventsapi + +uninstall-hook: + rm -f $(DESTDIR)$(sbindir)/glustereventsd + rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi + +endif diff --git a/events/src/__init__.py b/events/src/__init__.py new file mode 100644 index 00000000000..f27c53a4df4 --- /dev/null +++ b/events/src/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in new file mode 100644 index 00000000000..700093bee60 --- /dev/null +++ b/events/src/eventsapiconf.py.in @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import subprocess +glusterd_workdir = None + +# Methods +def get_glusterd_workdir(): + global glusterd_workdir + if glusterd_workdir is not None: + return glusterd_workdir + proc = subprocess.Popen(["gluster", "system::", "getwd"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines = True) + out, err = proc.communicate() + if proc.returncode == 0: + glusterd_workdir = out.strip() + else: + glusterd_workdir = "@GLUSTERD_WORKDIR@" + return glusterd_workdir + +SERVER_ADDRESS = "0.0.0.0" +SERVER_ADDRESSv4 = "0.0.0.0" +SERVER_ADDRESSv6 = "::1" +DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json" +CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json" +CUSTOM_CONFIG_FILE = get_glusterd_workdir() + CUSTOM_CONFIG_FILE_TO_SYNC +WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json" +WEBHOOKS_FILE = get_glusterd_workdir() + WEBHOOKS_FILE_TO_SYNC +LOG_FILE = "@localstatedir@/log/glusterfs/events.log" +EVENTSD = "glustereventsd" +CONFIG_KEYS = ["log-level", "port", "disable-events-log"] +BOOL_CONFIGS = ["disable-events-log"] +INT_CONFIGS = ["port"] +RESTART_CONFIGS = ["port"] +EVENTS_ENABLED = @EVENTS_ENABLED@ +UUID_FILE = get_glusterd_workdir() + "/glusterd.info" +PID_FILE = "@localstatedir@/run/glustereventsd.pid" +AUTO_BOOL_ATTRIBUTES = ["force", "push-pem", "no-verify"] +AUTO_INT_ATTRIBUTES = ["ssh-port"] +CERTS_DIR = get_glusterd_workdir() + "/events" + +# Errors +ERROR_SAME_CONFIG = 2 +ERROR_ALL_NODES_STATUS_NOT_OK = 3 +ERROR_PARTIAL_SUCCESS = 4 +ERROR_WEBHOOK_ALREADY_EXISTS = 5 +ERROR_WEBHOOK_NOT_EXISTS = 6 +ERROR_INVALID_CONFIG = 7 +ERROR_WEBHOOK_SYNC_FAILED = 8 +ERROR_CONFIG_SYNC_FAILED = 9 diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json new file mode 100644 index 00000000000..89e5b9c1d68 --- /dev/null +++ b/events/src/eventsconfig.json @@ -0,0 +1,5 @@ +{ + "log-level": "INFO", + "port": 24009, + "disable-events-log": false +} diff --git a/events/src/gf_event.py b/events/src/gf_event.py new file mode 100644 index 00000000000..260b0d9aa48 --- /dev/null +++ b/events/src/gf_event.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import socket +import time + +from gfevents.eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED +from gfevents.eventtypes import all_events + +from gfevents.utils import logger, setup_logger, get_config + +# Run this when this lib loads +setup_logger() + + +def gf_event(event_type, **kwargs): + if EVENTS_ENABLED == 0: + return + + if not isinstance(event_type, int) or event_type >= len(all_events): + logger.error("Invalid Event Type: {0}".format(event_type)) + return + + try: + client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + except socket.error as e: + logger.error("Unable to connect to events Server: {0}".format(e)) + return + + port = get_config("port") + if port is None: + logger.error("Unable to get eventsd port details") + return + + # Convert key value args into KEY1=VALUE1;KEY2=VALUE2;.. + msg = "" + for k, v in kwargs.items(): + msg += "{0}={1};".format(k, v) + + # <TIMESTAMP> <EVENT_TYPE> <MSG> + msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")).encode() + + try: + sent = client.sendto(msg, (SERVER_ADDRESS, port)) + assert sent == len(msg) + except socket.error as e: + logger.error("Unable to Send message: {0}".format(e)) + except AssertionError: + logger.error("Unable to send message. Sent: {0}, Actual: {1}".format( + sent, len(msg))) + finally: + client.close() diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py new file mode 100644 index 00000000000..341a3b60947 --- /dev/null +++ b/events/src/glustereventsd.py @@ -0,0 +1,159 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +from __future__ import print_function +import sys +import signal +import threading +try: + import socketserver +except ImportError: + import SocketServer as socketserver +import socket +from argparse import ArgumentParser, RawDescriptionHelpFormatter + +from eventtypes import all_events +import handlers +import utils +from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE +from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES +from utils import logger, PidFile, PidFileLockFailed, boolify + +# Subclass so that specifically IPv4 packets are captured +class UDPServerv4(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET + +# Subclass so that specifically IPv6 packets are captured +class UDPServerv6(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET6 + +class GlusterEventsRequestHandler(socketserver.BaseRequestHandler): + + def handle(self): + data = self.request[0].strip() + if sys.version_info >= (3,): + data = self.request[0].strip().decode("utf-8") + + logger.debug("EVENT: {0} from {1}".format(repr(data), + self.client_address[0])) + try: + # Event Format <TIMESTAMP> <TYPE> <DETAIL> + ts, key, value = data.split(" ", 2) + except ValueError: + logger.warn("Invalid Event Format {0}".format(data)) + return + + data_dict = {} + try: + # Format key=value;key=value + data_dict = dict(x.split('=') for x in value.split(';')) + except ValueError: + logger.warn("Unable to parse Event {0}".format(data)) + return + + for k, v in data_dict.items(): + try: + if k in AUTO_BOOL_ATTRIBUTES: + data_dict[k] = boolify(v) + if k in AUTO_INT_ATTRIBUTES: + data_dict[k] = int(v) + except ValueError: + # Auto Conversion failed, Retain the old value + continue + + try: + # Event Type to Function Map, Received event data will be in + # the form <TIMESTAMP> <TYPE> <DETAIL>, Get Event name for the + # received Type/Key and construct a function name starting with + # handle_ For example: handle_event_volume_create + func_name = "handle_" + all_events[int(key)].lower() + except IndexError: + # This type of Event is not handled? + logger.warn("Unhandled Event: {0}".format(key)) + func_name = None + + if func_name is not None: + # Get function from handlers module + func = getattr(handlers, func_name, None) + # If func is None, then handler unimplemented for that event. + if func is not None: + func(ts, int(key), data_dict) + else: + # Generic handler, broadcast whatever received + handlers.generic_handler(ts, int(key), data_dict) + + +def signal_handler_sigusr2(sig, frame): + utils.load_all() + utils.restart_webhook_pool() + + +def UDP_server_thread(sock): + sock.serve_forever() + + +def init_event_server(): + utils.setup_logger() + utils.load_all() + utils.init_webhook_pool() + + port = utils.get_config("port") + if port is None: + sys.stderr.write("Unable to get Port details from Config\n") + sys.exit(1) + + # Creating the Eventing Server, UDP Server for IPv4 packets + try: + serverv4 = UDPServerv4((SERVER_ADDRESSv4, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e)) + sys.exit(1) + # Creating the Eventing Server, UDP Server for IPv6 packets + try: + serverv6 = UDPServerv6((SERVER_ADDRESSv6, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e)) + sys.exit(1) + server_thread1 = threading.Thread(target=UDP_server_thread, + args=(serverv4,)) + server_thread2 = threading.Thread(target=UDP_server_thread, + args=(serverv6,)) + server_thread1.start() + server_thread2.start() + + +def get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=__doc__) + parser.add_argument("-p", "--pid-file", help="PID File", + default=PID_FILE) + + return parser.parse_args() + + +def main(): + args = get_args() + try: + with PidFile(args.pid_file): + init_event_server() + except PidFileLockFailed as e: + sys.stderr.write("Failed to get lock for pid file({0}): {1}".format( + args.pid_file, e)) + except KeyboardInterrupt: + sys.exit(1) + + +if __name__ == "__main__": + signal.signal(signal.SIGUSR2, signal_handler_sigusr2) + main() diff --git a/events/src/handlers.py b/events/src/handlers.py new file mode 100644 index 00000000000..7746d488bf3 --- /dev/null +++ b/events/src/handlers.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import utils + + +def generic_handler(ts, key, data): + """ + Generic handler to broadcast message to all peers, custom handlers + can be created by func name handler_<event_name> + Ex: handle_event_volume_create(ts, key, data) + """ + utils.publish(ts, key, data) + + +def handle_event_volume_set(ts, key, data): + """ + Received data will have all the options as one string, split into + list of options. "key1,value1,key2,value2" into + [[key1, value1], [key2, value2]] + """ + opts = data.get("options", "").strip(",").split(",") + data["options"] = [] + for i, opt in enumerate(opts): + if i % 2 == 0: + # Add new array with key + data["options"].append([opt]) + else: + # Add to the last added array + data["options"][-1].append(opt) + + utils.publish(ts, key, data) diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py new file mode 100644 index 00000000000..4d2e5f35b1c --- /dev/null +++ b/events/src/peer_eventsapi.py @@ -0,0 +1,669 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +from __future__ import print_function +import os +import json +from errno import EEXIST +import fcntl +from errno import EACCES, EAGAIN +import signal +import sys +import time + +import requests +from prettytable import PrettyTable + +from gluster.cliutils import (Cmd, node_output_ok, node_output_notok, + sync_file_to_peers, GlusterCmdException, + output_error, execute_in_peers, runcli, + set_common_args_func) +from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert + +from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + CUSTOM_CONFIG_FILE_TO_SYNC, + EVENTSD, + CONFIG_KEYS, + BOOL_CONFIGS, + INT_CONFIGS, + PID_FILE, + RESTART_CONFIGS, + ERROR_INVALID_CONFIG, + ERROR_WEBHOOK_NOT_EXISTS, + ERROR_CONFIG_SYNC_FAILED, + ERROR_WEBHOOK_ALREADY_EXISTS, + ERROR_PARTIAL_SUCCESS, + ERROR_ALL_NODES_STATUS_NOT_OK, + ERROR_SAME_CONFIG, + ERROR_WEBHOOK_SYNC_FAILED, + CERTS_DIR) + + +def handle_output_error(err, errcode=1, json_output=False): + if json_output: + print (json.dumps({ + "output": "", + "error": err + })) + sys.exit(errcode) + else: + output_error(err, errcode) + + +def file_content_overwrite(fname, data): + with open(fname + ".tmp", "w") as f: + f.write(json.dumps(data)) + + os.rename(fname + ".tmp", fname) + + +def create_custom_config_file_if_not_exists(args): + try: + config_dir = os.path.dirname(CUSTOM_CONFIG_FILE) + mkdirp(config_dir) + except OSError as e: + handle_output_error("Failed to create dir %s: %s" % (config_dir, e), + json_output=args.json) + + if not os.path.exists(CUSTOM_CONFIG_FILE): + with open(CUSTOM_CONFIG_FILE, "w") as f: + f.write("{}") + + +def create_webhooks_file_if_not_exists(args): + try: + webhooks_dir = os.path.dirname(WEBHOOKS_FILE) + mkdirp(webhooks_dir) + except OSError as e: + handle_output_error("Failed to create dir %s: %s" % (webhooks_dir, e), + json_output=args.json) + + if not os.path.exists(WEBHOOKS_FILE): + with open(WEBHOOKS_FILE, "w") as f: + f.write("{}") + + +def boolify(value): + val = False + if value.lower() in ["enabled", "true", "on", "yes"]: + val = True + return val + + +def mkdirp(path, exit_on_err=False, logger=None): + """ + Try creating required directory structure + ignore EEXIST and raise exception for rest of the errors. + Print error in stderr and exit + """ + try: + os.makedirs(path) + except OSError as e: + if e.errno != EEXIST or not os.path.isdir(path): + raise + + +def is_active(): + state = False + try: + with open(PID_FILE, "a+") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + state = False + except (IOError, OSError) as e: + if e.errno in (EACCES, EAGAIN): + # cannot grab. so, process still running..move on + state = True + else: + state = False + return state + + +def reload_service(): + pid = None + if is_active(): + with open(PID_FILE) as f: + try: + pid = int(f.read().strip()) + except ValueError: + pid = None + if pid is not None: + os.kill(pid, signal.SIGUSR2) + + return (0, "", "") + + +def rows_to_json(json_out, column_name, rows): + num_ok_rows = 0 + for row in rows: + num_ok_rows += 1 if row.ok else 0 + json_out.append({ + "node": row.hostname, + "node_status": "UP" if row.node_up else "DOWN", + column_name: "OK" if row.ok else "NOT OK", + "error": row.error + }) + return num_ok_rows + + +def rows_to_table(table, rows): + num_ok_rows = 0 + for row in rows: + num_ok_rows += 1 if row.ok else 0 + table.add_row([row.hostname, + "UP" if row.node_up else "DOWN", + "OK" if row.ok else "NOT OK: {0}".format( + row.error)]) + return num_ok_rows + + +def sync_to_peers(args): + if os.path.exists(WEBHOOKS_FILE): + try: + sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC) + except GlusterCmdException as e: + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] + handle_output_error("Failed to sync Webhooks file: [Error: {0}]" + "{1}".format(e.message[0], errmsg), + errcode=ERROR_WEBHOOK_SYNC_FAILED, + json_output=args.json) + + if os.path.exists(CUSTOM_CONFIG_FILE): + try: + sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC) + except GlusterCmdException as e: + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] + handle_output_error("Failed to sync Config file: [Error: {0}]" + "{1}".format(e.message[0], errmsg), + errcode=ERROR_CONFIG_SYNC_FAILED, + json_output=args.json) + + out = execute_in_peers("node-reload") + if not args.json: + table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"]) + table.align["NODE STATUS"] = "r" + table.align["SYNC STATUS"] = "r" + + json_out = [] + if args.json: + num_ok_rows = rows_to_json(json_out, "sync_status", out) + else: + num_ok_rows = rows_to_table(table, out) + + ret = 0 + if num_ok_rows == 0: + ret = ERROR_ALL_NODES_STATUS_NOT_OK + elif num_ok_rows != len(out): + ret = ERROR_PARTIAL_SUCCESS + + if args.json: + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (table) + + # If sync status is not ok for any node set error code as partial success + sys.exit(ret) + + +def node_output_handle(resp): + rc, out, err = resp + if rc == 0: + node_output_ok(out) + else: + node_output_notok(err) + + +def action_handle(action, json_output=False): + out = execute_in_peers("node-" + action) + column_name = action.upper() + if action == "status": + column_name = EVENTSD.upper() + + if not json_output: + table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"]) + table.align["NODE STATUS"] = "r" + table.align[column_name + " STATUS"] = "r" + + json_out = [] + if json_output: + rows_to_json(json_out, column_name.lower() + "_status", out) + else: + rows_to_table(table, out) + + return json_out if json_output else table + + +class NodeReload(Cmd): + name = "node-reload" + + def run(self, args): + node_output_handle(reload_service()) + + +class ReloadCmd(Cmd): + name = "reload" + + def run(self, args): + out = action_handle("reload", args.json) + if args.json: + print (json.dumps({ + "output": out, + "error": "" + })) + else: + print (out) + + +class NodeStatus(Cmd): + name = "node-status" + + def run(self, args): + node_output_ok("UP" if is_active() else "DOWN") + + +class StatusCmd(Cmd): + name = "status" + + def run(self, args): + webhooks = {} + if os.path.exists(WEBHOOKS_FILE): + webhooks = json.load(open(WEBHOOKS_FILE)) + + json_out = {"webhooks": [], "data": []} + if args.json: + json_out["webhooks"] = webhooks.keys() + else: + print ("Webhooks: " + ("" if webhooks else "None")) + for w in webhooks: + print (w) + + print () + + out = action_handle("status", args.json) + if args.json: + json_out["data"] = out + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (out) + + +class WebhookAddCmd(Cmd): + name = "webhook-add" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + parser.add_argument("--bearer_token", "-t", help="Bearer Token", + default="") + parser.add_argument("--secret", "-s", + help="Secret to add JWT Bearer Token", default="") + + def run(self, args): + create_webhooks_file_if_not_exists(args) + + with LockedOpen(WEBHOOKS_FILE, 'r+'): + data = json.load(open(WEBHOOKS_FILE)) + if data.get(args.url, None) is not None: + handle_output_error("Webhook already exists", + errcode=ERROR_WEBHOOK_ALREADY_EXISTS, + json_output=args.json) + + data[args.url] = {"token": args.bearer_token, + "secret": args.secret} + file_content_overwrite(WEBHOOKS_FILE, data) + + sync_to_peers(args) + + +class WebhookModCmd(Cmd): + name = "webhook-mod" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + parser.add_argument("--bearer_token", "-t", help="Bearer Token", + default="") + parser.add_argument("--secret", "-s", + help="Secret to add JWT Bearer Token", default="") + + def run(self, args): + create_webhooks_file_if_not_exists(args) + + with LockedOpen(WEBHOOKS_FILE, 'r+'): + data = json.load(open(WEBHOOKS_FILE)) + if data.get(args.url, None) is None: + handle_output_error("Webhook does not exists", + errcode=ERROR_WEBHOOK_NOT_EXISTS, + json_output=args.json) + + if isinstance(data[args.url], str): + data[args.url]["token"] = data[args.url] + + if args.bearer_token != "": + data[args.url]["token"] = args.bearer_token + + if args.secret != "": + data[args.url]["secret"] = args.secret + + file_content_overwrite(WEBHOOKS_FILE, data) + + sync_to_peers(args) + + +class WebhookDelCmd(Cmd): + name = "webhook-del" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + + def run(self, args): + create_webhooks_file_if_not_exists(args) + + with LockedOpen(WEBHOOKS_FILE, 'r+'): + data = json.load(open(WEBHOOKS_FILE)) + if data.get(args.url, None) is None: + handle_output_error("Webhook does not exists", + errcode=ERROR_WEBHOOK_NOT_EXISTS, + json_output=args.json) + + del data[args.url] + file_content_overwrite(WEBHOOKS_FILE, data) + + sync_to_peers(args) + + +class NodeWebhookTestCmd(Cmd): + name = "node-webhook-test" + + def args(self, parser): + parser.add_argument("url") + parser.add_argument("bearer_token") + parser.add_argument("secret") + + def run(self, args): + http_headers = {} + hashval = "" + if args.bearer_token != ".": + hashval = args.bearer_token + + if args.secret != ".": + hashval = get_jwt_token(args.secret, "TEST", int(time.time())) + + if hashval: + http_headers["Authorization"] = "Bearer " + hashval + + urldata = requests.utils.urlparse(args.url) + parts = urldata.netloc.split(":") + domain = parts[0] + # Default https port if not specified + port = 443 + if len(parts) == 2: + port = int(parts[1]) + + cert_path = os.path.join(CERTS_DIR, args.url.replace("/", "_").strip()) + verify = True + while True: + try: + resp = requests.post(args.url, headers=http_headers, + verify=verify) + # Successful webhook push + break + except requests.exceptions.SSLError as e: + # If verify is equal to cert path, but still failed with + # SSLError, Looks like some issue with custom downloaded + # certificate, Try with verify = false + if verify == cert_path: + verify = False + continue + + # If verify is instance of bool and True, then custom cert + # is required, download the cert and retry + try: + save_https_cert(domain, port, cert_path) + verify = cert_path + except Exception: + verify = False + + # Done with collecting cert, continue + continue + except Exception as e: + node_output_notok("{0}".format(e)) + break + + if resp.status_code != 200: + node_output_notok("{0}".format(resp.status_code)) + + node_output_ok() + + +class WebhookTestCmd(Cmd): + name = "webhook-test" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + parser.add_argument("--bearer_token", "-t", help="Bearer Token") + parser.add_argument("--secret", "-s", + help="Secret to generate Bearer Token") + + def run(self, args): + url = args.url + bearer_token = args.bearer_token + secret = args.secret + + if not args.url: + url = "." + if not args.bearer_token: + bearer_token = "." + if not args.secret: + secret = "." + + out = execute_in_peers("node-webhook-test", [url, bearer_token, + secret]) + + if not args.json: + table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"]) + table.align["NODE STATUS"] = "r" + table.align["WEBHOOK STATUS"] = "r" + + num_ok_rows = 0 + json_out = [] + if args.json: + num_ok_rows = rows_to_json(json_out, "webhook_status", out) + else: + num_ok_rows = rows_to_table(table, out) + + ret = 0 + if num_ok_rows == 0: + ret = ERROR_ALL_NODES_STATUS_NOT_OK + elif num_ok_rows != len(out): + ret = ERROR_PARTIAL_SUCCESS + + if args.json: + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (table) + + sys.exit(ret) + + +class ConfigGetCmd(Cmd): + name = "config-get" + + def args(self, parser): + parser.add_argument("--name", help="Config Name") + + def run(self, args): + data = json.load(open(DEFAULT_CONFIG_FILE)) + if os.path.exists(CUSTOM_CONFIG_FILE): + data.update(json.load(open(CUSTOM_CONFIG_FILE))) + + if args.name is not None and args.name not in CONFIG_KEYS: + handle_output_error("Invalid Config item", + errcode=ERROR_INVALID_CONFIG, + json_output=args.json) + + if args.json: + json_out = {} + if args.name is None: + json_out = data + else: + json_out[args.name] = data[args.name] + + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + table = PrettyTable(["NAME", "VALUE"]) + if args.name is None: + for k, v in data.items(): + table.add_row([k, v]) + else: + table.add_row([args.name, data[args.name]]) + + print (table) + + +def read_file_content_json(fname): + content = "{}" + with open(fname) as f: + content = f.read() + if content.strip() == "": + content = "{}" + + return json.loads(content) + + +class ConfigSetCmd(Cmd): + name = "config-set" + + def args(self, parser): + parser.add_argument("name", help="Config Name") + parser.add_argument("value", help="Config Value") + + def run(self, args): + if args.name not in CONFIG_KEYS: + handle_output_error("Invalid Config item", + errcode=ERROR_INVALID_CONFIG, + json_output=args.json) + + create_custom_config_file_if_not_exists(args) + + with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'): + data = json.load(open(DEFAULT_CONFIG_FILE)) + if os.path.exists(CUSTOM_CONFIG_FILE): + config_json = read_file_content_json(CUSTOM_CONFIG_FILE) + data.update(config_json) + + # Do Nothing if same as previous value + if data[args.name] == args.value: + handle_output_error("Config value not changed. Same config", + errcode=ERROR_SAME_CONFIG, + json_output=args.json) + + # TODO: Validate Value + new_data = read_file_content_json(CUSTOM_CONFIG_FILE) + + v = args.value + if args.name in BOOL_CONFIGS: + v = boolify(args.value) + + if args.name in INT_CONFIGS: + v = int(args.value) + + new_data[args.name] = v + file_content_overwrite(CUSTOM_CONFIG_FILE, new_data) + + # If any value changed which requires restart of REST server + restart = False + if args.name in RESTART_CONFIGS: + restart = True + + if restart: + print ("\nRestart glustereventsd in all nodes") + + sync_to_peers(args) + + +class ConfigResetCmd(Cmd): + name = "config-reset" + + def args(self, parser): + parser.add_argument("name", help="Config Name or all") + + def run(self, args): + create_custom_config_file_if_not_exists(args) + + with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'): + changed_keys = [] + data = {} + if os.path.exists(CUSTOM_CONFIG_FILE): + data = read_file_content_json(CUSTOM_CONFIG_FILE) + + # If No data available in custom config or, the specific config + # item is not available in custom config + if not data or \ + (args.name != "all" and data.get(args.name, None) is None): + handle_output_error("Config value not reset. Already " + "set to default value", + errcode=ERROR_SAME_CONFIG, + json_output=args.json) + + if args.name.lower() == "all": + for k, v in data.items(): + changed_keys.append(k) + + # Reset all keys + file_content_overwrite(CUSTOM_CONFIG_FILE, {}) + else: + changed_keys.append(args.name) + del data[args.name] + file_content_overwrite(CUSTOM_CONFIG_FILE, data) + + # If any value changed which requires restart of REST server + restart = False + for key in changed_keys: + if key in RESTART_CONFIGS: + restart = True + break + + if restart: + print ("\nRestart glustereventsd in all nodes") + + sync_to_peers(args) + + +class SyncCmd(Cmd): + name = "sync" + + def run(self, args): + sync_to_peers(args) + + +def common_args(parser): + parser.add_argument("--json", help="JSON Output", action="store_true") + + +if __name__ == "__main__": + set_common_args_func(common_args) + runcli() diff --git a/events/src/utils.py b/events/src/utils.py new file mode 100644 index 00000000000..6d4e0791a2b --- /dev/null +++ b/events/src/utils.py @@ -0,0 +1,445 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import sys +import json +import os +import logging +import logging.handlers +import fcntl +from errno import EBADF +from threading import Thread +import multiprocessing +try: + from queue import Queue +except ImportError: + from Queue import Queue +from datetime import datetime, timedelta +import base64 +import hmac +from hashlib import sha256 +from calendar import timegm + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from gfevents.eventsapiconf import (LOG_FILE, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + UUID_FILE, + CERTS_DIR) +from gfevents import eventtypes + + +# Webhooks list +_webhooks = {} +_webhooks_file_mtime = 0 +# Default Log Level +_log_level = "INFO" +# Config Object +_config = {} + +# Init Logger instance +logger = logging.getLogger(__name__) +NodeID = None +webhooks_pool = None + + +def boolify(value): + value = str(value) + if value.lower() in ["1", "on", "true", "yes"]: + return True + else: + return False + + +def log_event(data): + # Log all published events unless it is disabled + if not _config.get("disable-events-log", False): + logger.info(repr(data)) + + +def get_node_uuid(): + val = None + with open(UUID_FILE) as f: + for line in f: + if line.startswith("UUID="): + val = line.strip().split("=")[-1] + break + return val + + +def get_config(key, default_value=None): + if not _config: + load_config() + return _config.get(key, default_value) + + +def get_event_type_name(idx): + """ + Returns Event Type text from the index. For example, VOLUME_CREATE + """ + return eventtypes.all_events[idx].replace("EVENT_", "") + + +def setup_logger(): + """ + Logging initialization, Log level by default will be INFO, once config + file is read, respective log_level will be set. + """ + global logger + logger.setLevel(logging.INFO) + + # create the logging file handler + fh = logging.handlers.WatchedFileHandler(LOG_FILE) + + formatter = logging.Formatter("[%(asctime)s] %(levelname)s " + "[%(module)s - %(lineno)s:%(funcName)s] " + "- %(message)s") + + fh.setFormatter(formatter) + + # add handler to logger object + logger.addHandler(fh) + + +def load_config(): + """ + Load/Reload the config from REST Config files. This function will + be triggered during init and when SIGUSR2. + """ + global _config + _config = {} + if os.path.exists(DEFAULT_CONFIG_FILE): + _config = json.load(open(DEFAULT_CONFIG_FILE)) + if os.path.exists(CUSTOM_CONFIG_FILE): + _config.update(json.load(open(CUSTOM_CONFIG_FILE))) + + +def load_log_level(): + """ + Reads log_level from Config file and sets accordingly. This function will + be triggered during init and when SIGUSR2. + """ + global logger, _log_level + new_log_level = _config.get("log-level", "INFO") + if _log_level != new_log_level: + logger.setLevel(getattr(logging, new_log_level.upper())) + _log_level = new_log_level.upper() + + +def load_webhooks(): + """ + Load/Reload the webhooks list. This function will + be triggered during init and when SIGUSR2. + """ + global _webhooks, _webhooks_file_mtime + _webhooks = {} + if os.path.exists(WEBHOOKS_FILE): + _webhooks = json.load(open(WEBHOOKS_FILE)) + st = os.lstat(WEBHOOKS_FILE) + _webhooks_file_mtime = st.st_mtime + + +def load_all(): + """ + Wrapper function to call all load/reload functions. This function will + be triggered during init and when SIGUSR2. + """ + load_config() + load_webhooks() + load_log_level() + + +def publish(ts, event_key, data): + global NodeID + if NodeID is None: + NodeID = get_node_uuid() + + autoload_webhooks() + + message = { + "nodeid": NodeID, + "ts": int(ts), + "event": get_event_type_name(event_key), + "message": data + } + + log_event(message) + + if _webhooks: + plugin_webhook(message) + else: + # TODO: Default action? + pass + + +def autoload_webhooks(): + global _webhooks_file_mtime + try: + st = os.lstat(WEBHOOKS_FILE) + except OSError: + st = None + + if st is not None: + # If Stat is available and mtime is not matching with + # previously recorded mtime, reload the webhooks file + if st.st_mtime != _webhooks_file_mtime: + load_webhooks() + + +def base64_urlencode(inp): + return base64.urlsafe_b64encode(inp).replace("=", "").strip() + + +def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60): + exp = datetime.utcnow() + timedelta(seconds=jwt_expiry_time_seconds) + payload = { + "exp": timegm(exp.utctimetuple()), + "iss": "gluster", + "sub": event_type, + "iat": event_ts + } + header = '{"alg":"HS256","typ":"JWT"}' + payload = json.dumps(payload, separators=(',', ':'), sort_keys=True) + msg = base64_urlencode(header) + "." + base64_urlencode(payload) + return "%s.%s" % ( + msg, + base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest()) + ) + + +def save_https_cert(domain, port, cert_path): + import ssl + + # Cert file already available for this URL + if os.path.exists(cert_path): + return + + cert_data = ssl.get_server_certificate((domain, port)) + with open(cert_path, "w") as f: + f.write(cert_data) + + +def publish_to_webhook(url, token, secret, message_queue): + # Import requests here since not used in any other place + import requests + + http_headers = {"Content-Type": "application/json"} + urldata = requests.utils.urlparse(url) + parts = urldata.netloc.split(":") + domain = parts[0] + # Default https port if not specified + port = 443 + if len(parts) == 2: + port = int(parts[1]) + + cert_path = os.path.join(CERTS_DIR, url.replace("/", "_").strip()) + + while True: + hashval = "" + event_type, event_ts, message_json = message_queue.get() + if token != "" and token is not None: + hashval = token + + if secret != "" and secret is not None: + hashval = get_jwt_token(secret, event_type, event_ts) + + if hashval: + http_headers["Authorization"] = "Bearer " + hashval + + verify = True + while True: + try: + resp = requests.post(url, headers=http_headers, + data=message_json, + verify=verify) + # Successful webhook push + message_queue.task_done() + if resp.status_code != 200: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status Code: {status_code}".format( + url=url, + event=message_json, + status_code=resp.status_code)) + break + except requests.exceptions.SSLError as e: + # If verify is equal to cert path, but still failed with + # SSLError, Looks like some issue with custom downloaded + # certificate, Try with verify = false + if verify == cert_path: + logger.warn("Event push failed with certificate, " + "ignoring verification url={0} " + "Error={1}".format(url, e)) + verify = False + continue + + # If verify is instance of bool and True, then custom cert + # is required, download the cert and retry + try: + save_https_cert(domain, port, cert_path) + verify = cert_path + except Exception as ex: + verify = False + logger.warn("Unable to get Server certificate, " + "ignoring verification url={0} " + "Error={1}".format(url, ex)) + + # Done with collecting cert, continue + continue + except Exception as e: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status: {error}".format( + url=url, + event=message_json, + error=e)) + message_queue.task_done() + break + + +def plugin_webhook(message): + message_json = json.dumps(message, sort_keys=True) + logger.debug("EVENT: {0}".format(message_json)) + webhooks_pool.send(message["event"], message["ts"], message_json) + + +class LockedOpen(object): + + def __init__(self, filename, *args, **kwargs): + self.filename = filename + self.open_args = args + self.open_kwargs = kwargs + self.fileobj = None + + def __enter__(self): + """ + If two processes compete to update a file, The first process + gets the lock and the second process is blocked in the fcntl.flock() + call. When first process replaces the file and releases the lock, + the already open file descriptor in the second process now points + to a "ghost" file(not reachable by any path name) with old contents. + To avoid that conflict, check the fd already opened is same or + not. Open new one if not same + """ + f = open(self.filename, *self.open_args, **self.open_kwargs) + while True: + fcntl.flock(f, fcntl.LOCK_EX) + fnew = open(self.filename, *self.open_args, **self.open_kwargs) + if os.path.sameopenfile(f.fileno(), fnew.fileno()): + fnew.close() + break + else: + f.close() + f = fnew + self.fileobj = f + return f + + def __exit__(self, _exc_type, _exc_value, _traceback): + self.fileobj.close() + + +class PidFileLockFailed(Exception): + pass + + +class PidFile(object): + def __init__(self, filename): + self.filename = filename + self.pid = os.getpid() + self.fh = None + + def cleanup(self, remove_file=True): + try: + if self.fh is not None: + self.fh.close() + except IOError as exc: + if exc.errno != EBADF: + raise + finally: + if os.path.isfile(self.filename) and remove_file: + os.remove(self.filename) + + def __enter__(self): + self.fh = open(self.filename, 'a+') + try: + fcntl.flock(self.fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.cleanup(remove_file=False) + raise PidFileLockFailed(exc) + + self.fh.seek(0) + self.fh.truncate() + self.fh.write("%d\n" % self.pid) + self.fh.flush() + self.fh.seek(0) + return self + + def __exit__(self, _exc_type, _exc_value, _traceback): + self.cleanup() + + +def webhook_monitor(proc_queue, webhooks): + queues = {} + for url, data in webhooks.items(): + if isinstance(data, str): + token = data + secret = None + else: + token = data["token"] + secret = data["secret"] + + queues[url] = Queue() + t = Thread(target=publish_to_webhook, args=(url, token, secret, + queues[url])) + t.start() + + # Get the message sent to Process queue and distribute to all thread queues + while True: + message = proc_queue.get() + for _, q in queues.items(): + q.put(message) + + +class WebhookThreadPool(object): + def start(self): + # Separate process to emit messages to webhooks + # which maintains one thread per webhook. Separate + # process is required since on reload we need to stop + # and start the thread pool. In Python Threads can't be stopped + # so terminate the process and start again. Note: In transit + # events will be missed during reload + self.queue = multiprocessing.Queue() + self.proc = multiprocessing.Process(target=webhook_monitor, + args=(self.queue, _webhooks)) + self.proc.start() + + def restart(self): + # In transit messages are skipped, since webhooks monitor process + # is terminated. + self.proc.terminate() + self.start() + + def send(self, event_type, event_ts, message): + self.queue.put((event_type, event_ts, message)) + + +def init_webhook_pool(): + global webhooks_pool + webhooks_pool = WebhookThreadPool() + webhooks_pool.start() + + +def restart_webhook_pool(): + global webhooks_pool + if webhooks_pool is not None: + webhooks_pool.restart() |
