diff options
Diffstat (limited to 'events/src/glustereventsd.py')
| -rw-r--r-- | events/src/glustereventsd.py | 166 |
1 files changed, 87 insertions, 79 deletions
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 3fa57686a8b..341a3b60947 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -11,39 +11,46 @@ # from __future__ import print_function -import asyncore -import socket -import os -from multiprocessing import Process, Queue import sys import signal +import threading +try: + import socketserver +except ImportError: + import SocketServer as socketserver +import socket +from argparse import ArgumentParser, RawDescriptionHelpFormatter from eventtypes import all_events import handlers import utils -from eventsapiconf import SERVER_ADDRESS -from utils import logger - -# Global Queue, EventsHandler will add items to the queue -# and process_event will gets each item and handles it -events_queue = Queue() -events_server_pid = None - - -def process_event(): - """ - Seperate process which handles all the incoming events from Gluster - processes. - """ - while True: - data = events_queue.get() - logger.debug("EVENT: {0}".format(repr(data))) +from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE +from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES +from utils import logger, PidFile, PidFileLockFailed, boolify + +# Subclass so that specifically IPv4 packets are captured +class UDPServerv4(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET + +# Subclass so that specifically IPv6 packets are captured +class UDPServerv6(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET6 + +class GlusterEventsRequestHandler(socketserver.BaseRequestHandler): + + def handle(self): + data = self.request[0].strip() + if sys.version_info >= (3,): + data = self.request[0].strip().decode("utf-8") + + logger.debug("EVENT: {0} from {1}".format(repr(data), + self.client_address[0])) try: # Event Format <TIMESTAMP> <TYPE> <DETAIL> ts, key, value = data.split(" ", 2) except ValueError: logger.warn("Invalid Event Format {0}".format(data)) - continue + return data_dict = {} try: @@ -51,12 +58,22 @@ def process_event(): data_dict = dict(x.split('=') for x in value.split(';')) except ValueError: logger.warn("Unable to parse Event {0}".format(data)) - continue + return + + for k, v in data_dict.items(): + try: + if k in AUTO_BOOL_ATTRIBUTES: + data_dict[k] = boolify(v) + if k in AUTO_INT_ATTRIBUTES: + data_dict[k] = int(v) + except ValueError: + # Auto Conversion failed, Retain the old value + continue try: - # Event Type to Function Map, Recieved event data will be in + # Event Type to Function Map, Received event data will be in # the form <TIMESTAMP> <TYPE> <DETAIL>, Get Event name for the - # recieved Type/Key and construct a function name starting with + # received Type/Key and construct a function name starting with # handle_ For example: handle_event_volume_create func_name = "handle_" + all_events[int(key)].lower() except IndexError: @@ -75,73 +92,64 @@ def process_event(): handlers.generic_handler(ts, int(key), data_dict) -def process_event_wrapper(): - try: - process_event() - except KeyboardInterrupt: - return - - -class GlusterEventsHandler(asyncore.dispatcher_with_send): - - def handle_read(self): - data = self.recv(8192) - if data: - events_queue.put(data) - self.send(data) - - -class GlusterEventsServer(asyncore.dispatcher): - - def __init__(self): - global events_server_pid - asyncore.dispatcher.__init__(self) - # Start the Events listener process which listens to - # the global queue - p = Process(target=process_event_wrapper) - p.start() - events_server_pid = p.pid - - # Create UNIX Domain Socket, bind to path - self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.bind(SERVER_ADDRESS) - self.listen(5) - - def handle_accept(self): - pair = self.accept() - if pair is not None: - sock, addr = pair - GlusterEventsHandler(sock) - - def signal_handler_sigusr2(sig, frame): - if events_server_pid is not None: - os.kill(events_server_pid, signal.SIGUSR2) utils.load_all() + utils.restart_webhook_pool() + + +def UDP_server_thread(sock): + sock.serve_forever() def init_event_server(): utils.setup_logger() + utils.load_all() + utils.init_webhook_pool() - # Delete Socket file if Exists + port = utils.get_config("port") + if port is None: + sys.stderr.write("Unable to get Port details from Config\n") + sys.exit(1) + + # Creating the Eventing Server, UDP Server for IPv4 packets + try: + serverv4 = UDPServerv4((SERVER_ADDRESSv4, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e)) + sys.exit(1) + # Creating the Eventing Server, UDP Server for IPv6 packets try: - os.unlink(SERVER_ADDRESS) - except OSError: - if os.path.exists(SERVER_ADDRESS): - print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS), - file=sys.stderr) - sys.exit(1) + serverv6 = UDPServerv6((SERVER_ADDRESSv6, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e)) + sys.exit(1) + server_thread1 = threading.Thread(target=UDP_server_thread, + args=(serverv4,)) + server_thread2 = threading.Thread(target=UDP_server_thread, + args=(serverv6,)) + server_thread1.start() + server_thread2.start() - utils.load_all() - # Start the Eventing Server, UNIX DOMAIN SOCKET Server - GlusterEventsServer() - asyncore.loop() +def get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=__doc__) + parser.add_argument("-p", "--pid-file", help="PID File", + default=PID_FILE) + + return parser.parse_args() def main(): + args = get_args() try: - init_event_server() + with PidFile(args.pid_file): + init_event_server() + except PidFileLockFailed as e: + sys.stderr.write("Failed to get lock for pid file({0}): {1}".format( + args.pid_file, e)) except KeyboardInterrupt: sys.exit(1) |
