diff options
Diffstat (limited to 'events/src')
| -rw-r--r-- | events/src/Makefile.am | 4 | ||||
| -rw-r--r-- | events/src/eventsapiconf.py.in | 5 | ||||
| -rw-r--r-- | events/src/gf_event.py | 19 | ||||
| -rw-r--r-- | events/src/glustereventsd.py | 46 | ||||
| -rw-r--r-- | events/src/peer_eventsapi.py | 57 | ||||
| -rw-r--r-- | events/src/utils.py | 26 |
6 files changed, 96 insertions, 61 deletions
diff --git a/events/src/Makefile.am b/events/src/Makefile.am index 4e83a469cc2..3b229691897 100644 --- a/events/src/Makefile.am +++ b/events/src/Makefile.am @@ -5,7 +5,7 @@ EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in \ BUILT_SOURCES = eventtypes.py CLEANFILES = eventtypes.py -eventsdir = $(GLUSTERFS_LIBEXECDIR)/events +eventsdir = $(GLUSTERFS_LIBEXECDIR)/gfevents if BUILD_EVENTS events_PYTHON = __init__.py gf_event.py eventsapiconf.py eventtypes.py \ utils.py @@ -28,7 +28,7 @@ eventspeerscript_SCRIPTS = peer_eventsapi.py install-exec-hook: $(mkdir_p) $(DESTDIR)$(sbindir) rm -f $(DESTDIR)$(sbindir)/glustereventsd - ln -s $(GLUSTERFS_LIBEXECDIR)/events/glustereventsd.py \ + ln -s $(GLUSTERFS_LIBEXECDIR)/gfevents/glustereventsd.py \ $(DESTDIR)$(sbindir)/glustereventsd rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi ln -s $(GLUSTERFS_LIBEXECDIR)/peer_eventsapi.py \ diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in index 003cb981572..700093bee60 100644 --- a/events/src/eventsapiconf.py.in +++ b/events/src/eventsapiconf.py.in @@ -18,7 +18,8 @@ def get_glusterd_workdir(): if glusterd_workdir is not None: return glusterd_workdir proc = subprocess.Popen(["gluster", "system::", "getwd"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines = True) out, err = proc.communicate() if proc.returncode == 0: glusterd_workdir = out.strip() @@ -27,6 +28,8 @@ def get_glusterd_workdir(): return glusterd_workdir SERVER_ADDRESS = "0.0.0.0" +SERVER_ADDRESSv4 = "0.0.0.0" +SERVER_ADDRESSv6 = "::1" DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json" CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json" CUSTOM_CONFIG_FILE = get_glusterd_workdir() + CUSTOM_CONFIG_FILE_TO_SYNC diff --git a/events/src/gf_event.py b/events/src/gf_event.py index fd272434242..260b0d9aa48 100644 --- a/events/src/gf_event.py +++ b/events/src/gf_event.py @@ -1,4 +1,3 @@ -#!/usr/bin/python2 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -13,10 +12,10 @@ import socket import time -from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED -from eventtypes import all_events +from gfevents.eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED +from gfevents.eventtypes import all_events -from utils import logger, setup_logger, get_config +from gfevents.utils import logger, setup_logger, get_config # Run this when this lib loads setup_logger() @@ -36,18 +35,18 @@ def gf_event(event_type, **kwargs): logger.error("Unable to connect to events Server: {0}".format(e)) return + port = get_config("port") + if port is None: + logger.error("Unable to get eventsd port details") + return + # Convert key value args into KEY1=VALUE1;KEY2=VALUE2;.. msg = "" for k, v in kwargs.items(): msg += "{0}={1};".format(k, v) # <TIMESTAMP> <EVENT_TYPE> <MSG> - msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")) - - port = get_config("port") - if port is None: - logger.error("Unable to get eventsd port details") - return + msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")).encode() try: sent = client.sendto(msg, (SERVER_ADDRESS, port)) diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 014f5a0009d..341a3b60947 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -13,6 +13,7 @@ from __future__ import print_function import sys import signal +import threading try: import socketserver except ImportError: @@ -23,15 +24,25 @@ from argparse import ArgumentParser, RawDescriptionHelpFormatter from eventtypes import all_events import handlers import utils -from eventsapiconf import SERVER_ADDRESS, PID_FILE +from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES from utils import logger, PidFile, PidFileLockFailed, boolify +# Subclass so that specifically IPv4 packets are captured +class UDPServerv4(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET -class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): +# Subclass so that specifically IPv6 packets are captured +class UDPServerv6(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET6 + +class GlusterEventsRequestHandler(socketserver.BaseRequestHandler): def handle(self): data = self.request[0].strip() + if sys.version_info >= (3,): + data = self.request[0].strip().decode("utf-8") + logger.debug("EVENT: {0} from {1}".format(repr(data), self.client_address[0])) try: @@ -49,7 +60,7 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): logger.warn("Unable to parse Event {0}".format(data)) return - for k, v in data_dict.iteritems(): + for k, v in data_dict.items(): try: if k in AUTO_BOOL_ATTRIBUTES: data_dict[k] = boolify(v) @@ -86,6 +97,10 @@ def signal_handler_sigusr2(sig, frame): utils.restart_webhook_pool() +def UDP_server_thread(sock): + sock.serve_forever() + + def init_event_server(): utils.setup_logger() utils.load_all() @@ -96,15 +111,26 @@ def init_event_server(): sys.stderr.write("Unable to get Port details from Config\n") sys.exit(1) - # Start the Eventing Server, UDP Server + # Creating the Eventing Server, UDP Server for IPv4 packets + try: + serverv4 = UDPServerv4((SERVER_ADDRESSv4, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e)) + sys.exit(1) + # Creating the Eventing Server, UDP Server for IPv6 packets try: - server = SocketServer.ThreadingUDPServer( - (SERVER_ADDRESS, port), - GlusterEventsRequestHandler) + serverv6 = UDPServerv6((SERVER_ADDRESSv6, port), + GlusterEventsRequestHandler) except socket.error as e: - sys.stderr.write("Failed to start Eventsd: {0}\n".format(e)) + sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e)) sys.exit(1) - server.serve_forever() + server_thread1 = threading.Thread(target=UDP_server_thread, + args=(serverv4,)) + server_thread2 = threading.Thread(target=UDP_server_thread, + args=(serverv6,)) + server_thread1.start() + server_thread2.start() def get_args(): diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py index fb11e12deb4..4d2e5f35b1c 100644 --- a/events/src/peer_eventsapi.py +++ b/events/src/peer_eventsapi.py @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -27,28 +27,28 @@ from gluster.cliutils import (Cmd, node_output_ok, node_output_notok, sync_file_to_peers, GlusterCmdException, output_error, execute_in_peers, runcli, set_common_args_func) -from events.utils import LockedOpen, get_jwt_token, save_https_cert - -from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE, - CUSTOM_CONFIG_FILE_TO_SYNC, - EVENTSD, - CONFIG_KEYS, - BOOL_CONFIGS, - INT_CONFIGS, - PID_FILE, - RESTART_CONFIGS, - ERROR_INVALID_CONFIG, - ERROR_WEBHOOK_NOT_EXISTS, - ERROR_CONFIG_SYNC_FAILED, - ERROR_WEBHOOK_ALREADY_EXISTS, - ERROR_PARTIAL_SUCCESS, - ERROR_ALL_NODES_STATUS_NOT_OK, - ERROR_SAME_CONFIG, - ERROR_WEBHOOK_SYNC_FAILED, - CERTS_DIR) +from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert + +from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + CUSTOM_CONFIG_FILE_TO_SYNC, + EVENTSD, + CONFIG_KEYS, + BOOL_CONFIGS, + INT_CONFIGS, + PID_FILE, + RESTART_CONFIGS, + ERROR_INVALID_CONFIG, + ERROR_WEBHOOK_NOT_EXISTS, + ERROR_CONFIG_SYNC_FAILED, + ERROR_WEBHOOK_ALREADY_EXISTS, + ERROR_PARTIAL_SUCCESS, + ERROR_ALL_NODES_STATUS_NOT_OK, + ERROR_SAME_CONFIG, + ERROR_WEBHOOK_SYNC_FAILED, + CERTS_DIR) def handle_output_error(err, errcode=1, json_output=False): @@ -173,8 +173,10 @@ def sync_to_peers(args): try: sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC) except GlusterCmdException as e: + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] handle_output_error("Failed to sync Webhooks file: [Error: {0}]" - "{1}".format(e[0], e[2]), + "{1}".format(e.message[0], errmsg), errcode=ERROR_WEBHOOK_SYNC_FAILED, json_output=args.json) @@ -182,8 +184,10 @@ def sync_to_peers(args): try: sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC) except GlusterCmdException as e: + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] handle_output_error("Failed to sync Config file: [Error: {0}]" - "{1}".format(e[0], e[2]), + "{1}".format(e.message[0], errmsg), errcode=ERROR_CONFIG_SYNC_FAILED, json_output=args.json) @@ -349,8 +353,7 @@ class WebhookModCmd(Cmd): errcode=ERROR_WEBHOOK_NOT_EXISTS, json_output=args.json) - if isinstance(data[args.url], str) or \ - isinstance(data[args.url], unicode): + if isinstance(data[args.url], str): data[args.url]["token"] = data[args.url] if args.bearer_token != "": diff --git a/events/src/utils.py b/events/src/utils.py index c1c0bccfe66..6d4e0791a2b 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -9,9 +9,11 @@ # cases as published by the Free Software Foundation. # +import sys import json import os import logging +import logging.handlers import fcntl from errno import EBADF from threading import Thread @@ -26,13 +28,15 @@ import hmac from hashlib import sha256 from calendar import timegm -from eventsapiconf import (LOG_FILE, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE, - UUID_FILE, - CERTS_DIR) -import eventtypes +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from gfevents.eventsapiconf import (LOG_FILE, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + UUID_FILE, + CERTS_DIR) +from gfevents import eventtypes # Webhooks list @@ -95,7 +99,7 @@ def setup_logger(): logger.setLevel(logging.INFO) # create the logging file handler - fh = logging.FileHandler(LOG_FILE) + fh = logging.handlers.WatchedFileHandler(LOG_FILE) formatter = logging.Formatter("[%(asctime)s] %(levelname)s " "[%(module)s - %(lineno)s:%(funcName)s] " @@ -387,7 +391,7 @@ class PidFile(object): def webhook_monitor(proc_queue, webhooks): queues = {} for url, data in webhooks.items(): - if isinstance(data, str) or isinstance(data, unicode): + if isinstance(data, str): token = data secret = None else: @@ -408,8 +412,8 @@ def webhook_monitor(proc_queue, webhooks): class WebhookThreadPool(object): def start(self): - # Seperate process to emit messages to webhooks - # which maintains one thread per webhook. Seperate + # Separate process to emit messages to webhooks + # which maintains one thread per webhook. Separate # process is required since on reload we need to stop # and start the thread pool. In Python Threads can't be stopped # so terminate the process and start again. Note: In transit |
