diff options
author | Aravinda VK <avishwan@redhat.com> | 2016-08-17 13:46:00 +0530 |
---|---|---|
committer | Atin Mukherjee <amukherj@redhat.com> | 2016-08-30 18:34:59 -0700 |
commit | b71ae7d77d7ab1581d266f6435d134958844d0db (patch) | |
tree | 1e2044ee1c7d7ec4dc5a620693484fb091ffb2c9 /events/src/glustereventsd.py | |
parent | c1f5cf0bda47fc34725084ee3988b0efe2dcfc8a (diff) |
eventsapi: Add support for Client side Events
Client side gf_event uses ctx->cmd_args.volfile_server to push
notifications to the eventsd.
Socket server changed from Unix domain socket to UDP to support
external events.
Following to be addressed in different patch
- Port used for eventsd is 24009. Make it configurable
Already configurable in Server side. Configurable in gf_event API
is required.
- Auth Token yet to be added as discussed in
https://www.gluster.org/pipermail/gluster-devel/2016-August/050324.html
Change-Id: I159acf80b681d10b82d52cfb3ffdf85cb896542d
BUG: 1367774
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/15189
Smoke: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Prashanth Pai <ppai@redhat.com>
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
Diffstat (limited to 'events/src/glustereventsd.py')
-rw-r--r-- | events/src/glustereventsd.py | 95 |
1 files changed, 23 insertions, 72 deletions
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 3fa57686a8b..91a0743ff22 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -11,12 +11,10 @@ # from __future__ import print_function -import asyncore -import socket -import os -from multiprocessing import Process, Queue import sys import signal +import SocketServer +import socket from eventtypes import all_events import handlers @@ -24,26 +22,19 @@ import utils from eventsapiconf import SERVER_ADDRESS from utils import logger -# Global Queue, EventsHandler will add items to the queue -# and process_event will gets each item and handles it -events_queue = Queue() -events_server_pid = None +class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): -def process_event(): - """ - Seperate process which handles all the incoming events from Gluster - processes. - """ - while True: - data = events_queue.get() - logger.debug("EVENT: {0}".format(repr(data))) + def handle(self): + data = self.request[0].strip() + logger.debug("EVENT: {0} from {1}".format(repr(data), + self.client_address[0])) try: # Event Format <TIMESTAMP> <TYPE> <DETAIL> ts, key, value = data.split(" ", 2) except ValueError: logger.warn("Invalid Event Format {0}".format(data)) - continue + return data_dict = {} try: @@ -51,7 +42,7 @@ def process_event(): data_dict = dict(x.split('=') for x in value.split(';')) except ValueError: logger.warn("Unable to parse Event {0}".format(data)) - continue + return try: # Event Type to Function Map, Recieved event data will be in @@ -75,68 +66,28 @@ def process_event(): handlers.generic_handler(ts, int(key), data_dict) -def process_event_wrapper(): - try: - process_event() - except KeyboardInterrupt: - return - - -class GlusterEventsHandler(asyncore.dispatcher_with_send): - - def handle_read(self): - data = self.recv(8192) - if data: - events_queue.put(data) - self.send(data) - - -class GlusterEventsServer(asyncore.dispatcher): - - def __init__(self): - global events_server_pid - asyncore.dispatcher.__init__(self) - # Start the Events listener process which listens to - # the global queue - p = Process(target=process_event_wrapper) - p.start() - events_server_pid = p.pid - - # Create UNIX Domain Socket, bind to path - self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.bind(SERVER_ADDRESS) - self.listen(5) - - def handle_accept(self): - pair = self.accept() - if pair is not None: - sock, addr = pair - GlusterEventsHandler(sock) - - def signal_handler_sigusr2(sig, frame): - if events_server_pid is not None: - os.kill(events_server_pid, signal.SIGUSR2) utils.load_all() def init_event_server(): utils.setup_logger() - - # Delete Socket file if Exists - try: - os.unlink(SERVER_ADDRESS) - except OSError: - if os.path.exists(SERVER_ADDRESS): - print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS), - file=sys.stderr) - sys.exit(1) - utils.load_all() - # Start the Eventing Server, UNIX DOMAIN SOCKET Server - GlusterEventsServer() - asyncore.loop() + port = utils.get_config("port") + if port is None: + sys.stderr.write("Unable to get Port details from Config\n") + sys.exit(1) + + # Start the Eventing Server, UDP Server + try: + server = SocketServer.ThreadingUDPServer( + (SERVER_ADDRESS, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd: {0}\n".format(e)) + sys.exit(1) + server.serve_forever() def main(): |