diff options
Diffstat (limited to 'events/src')
| -rw-r--r-- | events/src/Makefile.am | 6 | ||||
| -rw-r--r-- | events/src/eventsapiconf.py.in | 28 | ||||
| -rw-r--r-- | events/src/gf_event.py | 19 | ||||
| -rw-r--r-- | events/src/glustereventsd.py | 51 | ||||
| -rw-r--r-- | events/src/peer_eventsapi.py | 57 | ||||
| -rw-r--r-- | events/src/utils.py | 33 |
6 files changed, 127 insertions, 67 deletions
diff --git a/events/src/Makefile.am b/events/src/Makefile.am index 4308ccdbb22..3b229691897 100644 --- a/events/src/Makefile.am +++ b/events/src/Makefile.am @@ -5,9 +5,11 @@ EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in \ BUILT_SOURCES = eventtypes.py CLEANFILES = eventtypes.py -eventsdir = $(GLUSTERFS_LIBEXECDIR)/events +eventsdir = $(GLUSTERFS_LIBEXECDIR)/gfevents +if BUILD_EVENTS events_PYTHON = __init__.py gf_event.py eventsapiconf.py eventtypes.py \ utils.py +endif # this does not work, see the Makefile.am in the root for a workaround #nodist_events_PYTHON = eventtypes.py @@ -26,7 +28,7 @@ eventspeerscript_SCRIPTS = peer_eventsapi.py install-exec-hook: $(mkdir_p) $(DESTDIR)$(sbindir) rm -f $(DESTDIR)$(sbindir)/glustereventsd - ln -s $(GLUSTERFS_LIBEXECDIR)/events/glustereventsd.py \ + ln -s $(GLUSTERFS_LIBEXECDIR)/gfevents/glustereventsd.py \ $(DESTDIR)$(sbindir)/glustereventsd rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi ln -s $(GLUSTERFS_LIBEXECDIR)/peer_eventsapi.py \ diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in index 687eaa39c00..700093bee60 100644 --- a/events/src/eventsapiconf.py.in +++ b/events/src/eventsapiconf.py.in @@ -9,12 +9,32 @@ # cases as published by the Free Software Foundation. # +import subprocess +glusterd_workdir = None + +# Methods +def get_glusterd_workdir(): + global glusterd_workdir + if glusterd_workdir is not None: + return glusterd_workdir + proc = subprocess.Popen(["gluster", "system::", "getwd"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines = True) + out, err = proc.communicate() + if proc.returncode == 0: + glusterd_workdir = out.strip() + else: + glusterd_workdir = "@GLUSTERD_WORKDIR@" + return glusterd_workdir + SERVER_ADDRESS = "0.0.0.0" +SERVER_ADDRESSv4 = "0.0.0.0" +SERVER_ADDRESSv6 = "::1" DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json" CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json" -CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC +CUSTOM_CONFIG_FILE = get_glusterd_workdir() + CUSTOM_CONFIG_FILE_TO_SYNC WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json" -WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC +WEBHOOKS_FILE = get_glusterd_workdir() + WEBHOOKS_FILE_TO_SYNC LOG_FILE = "@localstatedir@/log/glusterfs/events.log" EVENTSD = "glustereventsd" CONFIG_KEYS = ["log-level", "port", "disable-events-log"] @@ -22,11 +42,11 @@ BOOL_CONFIGS = ["disable-events-log"] INT_CONFIGS = ["port"] RESTART_CONFIGS = ["port"] EVENTS_ENABLED = @EVENTS_ENABLED@ -UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info" +UUID_FILE = get_glusterd_workdir() + "/glusterd.info" PID_FILE = "@localstatedir@/run/glustereventsd.pid" AUTO_BOOL_ATTRIBUTES = ["force", "push-pem", "no-verify"] AUTO_INT_ATTRIBUTES = ["ssh-port"] -CERTS_DIR = "@GLUSTERD_WORKDIR@/events" +CERTS_DIR = get_glusterd_workdir() + "/events" # Errors ERROR_SAME_CONFIG = 2 diff --git a/events/src/gf_event.py b/events/src/gf_event.py index f9ece6adc28..260b0d9aa48 100644 --- a/events/src/gf_event.py +++ b/events/src/gf_event.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -13,10 +12,10 @@ import socket import time -from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED -from eventtypes import all_events +from gfevents.eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED +from gfevents.eventtypes import all_events -from utils import logger, setup_logger, get_config +from gfevents.utils import logger, setup_logger, get_config # Run this when this lib loads setup_logger() @@ -36,18 +35,18 @@ def gf_event(event_type, **kwargs): logger.error("Unable to connect to events Server: {0}".format(e)) return + port = get_config("port") + if port is None: + logger.error("Unable to get eventsd port details") + return + # Convert key value args into KEY1=VALUE1;KEY2=VALUE2;.. msg = "" for k, v in kwargs.items(): msg += "{0}={1};".format(k, v) # <TIMESTAMP> <EVENT_TYPE> <MSG> - msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")) - - port = get_config("port") - if port is None: - logger.error("Unable to get eventsd port details") - return + msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")).encode() try: sent = client.sendto(msg, (SERVER_ADDRESS, port)) diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 606b89cbd7f..341a3b60947 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -13,22 +13,36 @@ from __future__ import print_function import sys import signal -import SocketServer +import threading +try: + import socketserver +except ImportError: + import SocketServer as socketserver import socket from argparse import ArgumentParser, RawDescriptionHelpFormatter from eventtypes import all_events import handlers import utils -from eventsapiconf import SERVER_ADDRESS, PID_FILE +from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES from utils import logger, PidFile, PidFileLockFailed, boolify +# Subclass so that specifically IPv4 packets are captured +class UDPServerv4(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET -class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): +# Subclass so that specifically IPv6 packets are captured +class UDPServerv6(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET6 + +class GlusterEventsRequestHandler(socketserver.BaseRequestHandler): def handle(self): data = self.request[0].strip() + if sys.version_info >= (3,): + data = self.request[0].strip().decode("utf-8") + logger.debug("EVENT: {0} from {1}".format(repr(data), self.client_address[0])) try: @@ -46,7 +60,7 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): logger.warn("Unable to parse Event {0}".format(data)) return - for k, v in data_dict.iteritems(): + for k, v in data_dict.items(): try: if k in AUTO_BOOL_ATTRIBUTES: data_dict[k] = boolify(v) @@ -83,6 +97,10 @@ def signal_handler_sigusr2(sig, frame): utils.restart_webhook_pool() +def UDP_server_thread(sock): + sock.serve_forever() + + def init_event_server(): utils.setup_logger() utils.load_all() @@ -93,15 +111,26 @@ def init_event_server(): sys.stderr.write("Unable to get Port details from Config\n") sys.exit(1) - # Start the Eventing Server, UDP Server + # Creating the Eventing Server, UDP Server for IPv4 packets + try: + serverv4 = UDPServerv4((SERVER_ADDRESSv4, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e)) + sys.exit(1) + # Creating the Eventing Server, UDP Server for IPv6 packets try: - server = SocketServer.ThreadingUDPServer( - (SERVER_ADDRESS, port), - GlusterEventsRequestHandler) + serverv6 = UDPServerv6((SERVER_ADDRESSv6, port), + GlusterEventsRequestHandler) except socket.error as e: - sys.stderr.write("Failed to start Eventsd: {0}\n".format(e)) + sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e)) sys.exit(1) - server.serve_forever() + server_thread1 = threading.Thread(target=UDP_server_thread, + args=(serverv4,)) + server_thread2 = threading.Thread(target=UDP_server_thread, + args=(serverv6,)) + server_thread1.start() + server_thread2.start() def get_args(): diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py index d72fdbe99c4..4d2e5f35b1c 100644 --- a/events/src/peer_eventsapi.py +++ b/events/src/peer_eventsapi.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -27,28 +27,28 @@ from gluster.cliutils import (Cmd, node_output_ok, node_output_notok, sync_file_to_peers, GlusterCmdException, output_error, execute_in_peers, runcli, set_common_args_func) -from events.utils import LockedOpen, get_jwt_token, save_https_cert - -from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE, - CUSTOM_CONFIG_FILE_TO_SYNC, - EVENTSD, - CONFIG_KEYS, - BOOL_CONFIGS, - INT_CONFIGS, - PID_FILE, - RESTART_CONFIGS, - ERROR_INVALID_CONFIG, - ERROR_WEBHOOK_NOT_EXISTS, - ERROR_CONFIG_SYNC_FAILED, - ERROR_WEBHOOK_ALREADY_EXISTS, - ERROR_PARTIAL_SUCCESS, - ERROR_ALL_NODES_STATUS_NOT_OK, - ERROR_SAME_CONFIG, - ERROR_WEBHOOK_SYNC_FAILED, - CERTS_DIR) +from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert + +from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + CUSTOM_CONFIG_FILE_TO_SYNC, + EVENTSD, + CONFIG_KEYS, + BOOL_CONFIGS, + INT_CONFIGS, + PID_FILE, + RESTART_CONFIGS, + ERROR_INVALID_CONFIG, + ERROR_WEBHOOK_NOT_EXISTS, + ERROR_CONFIG_SYNC_FAILED, + ERROR_WEBHOOK_ALREADY_EXISTS, + ERROR_PARTIAL_SUCCESS, + ERROR_ALL_NODES_STATUS_NOT_OK, + ERROR_SAME_CONFIG, + ERROR_WEBHOOK_SYNC_FAILED, + CERTS_DIR) def handle_output_error(err, errcode=1, json_output=False): @@ -173,8 +173,10 @@ def sync_to_peers(args): try: sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC) except GlusterCmdException as e: + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] handle_output_error("Failed to sync Webhooks file: [Error: {0}]" - "{1}".format(e[0], e[2]), + "{1}".format(e.message[0], errmsg), errcode=ERROR_WEBHOOK_SYNC_FAILED, json_output=args.json) @@ -182,8 +184,10 @@ def sync_to_peers(args): try: sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC) except GlusterCmdException as e: + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] handle_output_error("Failed to sync Config file: [Error: {0}]" - "{1}".format(e[0], e[2]), + "{1}".format(e.message[0], errmsg), errcode=ERROR_CONFIG_SYNC_FAILED, json_output=args.json) @@ -349,8 +353,7 @@ class WebhookModCmd(Cmd): errcode=ERROR_WEBHOOK_NOT_EXISTS, json_output=args.json) - if isinstance(data[args.url], str) or \ - isinstance(data[args.url], unicode): + if isinstance(data[args.url], str): data[args.url]["token"] = data[args.url] if args.bearer_token != "": diff --git a/events/src/utils.py b/events/src/utils.py index f405e44ac70..6d4e0791a2b 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -9,27 +9,34 @@ # cases as published by the Free Software Foundation. # +import sys import json import os import logging +import logging.handlers import fcntl from errno import EBADF from threading import Thread import multiprocessing -from Queue import Queue +try: + from queue import Queue +except ImportError: + from Queue import Queue from datetime import datetime, timedelta import base64 import hmac from hashlib import sha256 from calendar import timegm -from eventsapiconf import (LOG_FILE, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE, - UUID_FILE, - CERTS_DIR) -import eventtypes +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from gfevents.eventsapiconf import (LOG_FILE, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + UUID_FILE, + CERTS_DIR) +from gfevents import eventtypes # Webhooks list @@ -92,7 +99,7 @@ def setup_logger(): logger.setLevel(logging.INFO) # create the logging file handler - fh = logging.FileHandler(LOG_FILE) + fh = logging.handlers.WatchedFileHandler(LOG_FILE) formatter = logging.Formatter("[%(asctime)s] %(levelname)s " "[%(module)s - %(lineno)s:%(funcName)s] " @@ -206,7 +213,7 @@ def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60): msg = base64_urlencode(header) + "." + base64_urlencode(payload) return "%s.%s" % ( msg, - base64_urlencode(hmac.HMAC(secret, msg, sha256).digest()) + base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest()) ) @@ -384,7 +391,7 @@ class PidFile(object): def webhook_monitor(proc_queue, webhooks): queues = {} for url, data in webhooks.items(): - if isinstance(data, str) or isinstance(data, unicode): + if isinstance(data, str): token = data secret = None else: @@ -405,8 +412,8 @@ def webhook_monitor(proc_queue, webhooks): class WebhookThreadPool(object): def start(self): - # Seperate process to emit messages to webhooks - # which maintains one thread per webhook. Seperate + # Separate process to emit messages to webhooks + # which maintains one thread per webhook. Separate # process is required since on reload we need to stop # and start the thread pool. In Python Threads can't be stopped # so terminate the process and start again. Note: In transit |
