summaryrefslogtreecommitdiffstats
path: root/events/src/glustereventsd.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2016-05-05 18:34:41 +0530
committerJeff Darcy <jdarcy@redhat.com>2016-07-18 13:52:20 -0700
commit5ed781ecf531b7916e51c174426e222dab717fb8 (patch)
tree62fdb63045d4bc9350713aeaa3a8639a316b24ed /events/src/glustereventsd.py
parent386340967926fa062e2c1eee4784f951846167b3 (diff)
eventsapi: Gluster Eventing Feature implementation
[Depends on http://review.gluster.org/14627] Design is available in `glusterfs-specs`, A change from the design is support of webhook instead of Websockets as discussed in the design http://review.gluster.org/13115 Since Websocket support depends on REST APIs, I will add Websocket support once REST APIs patch gets merged Usage: Run following command to start/stop Eventsapi server in all Peers, which will collect the notifications from any Gluster daemon and emits to configured client. gluster-eventsapi start|stop|restart|reload Status of running services can be checked using, gluster-eventsapi status Events listener is a HTTP(S) server which listens to events emited by the Gluster. Create a HTTP Server to listen on POST and register that URL using, gluster-eventsapi webhook-add <URL> [--bearer-token <TOKEN>] For example, if HTTP Server running in `http://192.168.122.188:9000` then add that URL using, gluster-eventsapi webhook-add http://192.168.122.188:9000 If it expects a Token then specify it using `--bearer-token` or `-t` We can also test Webhook if all peer nodes can send message or not using, gluster-eventsapi webhook-test <URL> [--bearer-token <TOKEN>] Configurations can be viewed/updated using, gluster-eventsapi config-get [--name] gluster-eventsapi config-set <NAME> <VALUE> gluster-eventsapi config-reset <NAME|all> If any one peer node was down during config-set/reset or webhook modifications, Run sync command from good node when a peer node comes back. Automatic update is not yet implemented. gluster-eventsapi sync Basic Events Client(HTTP Server) is included with the code, Start running the client with required port and start listening to the events. /usr/share/glusterfs/scripts/eventsdash.py --port 8080 Default port is 9000, if no port is specified, once it started running then configure gluster-eventsapi to send events to that client. Eventsapi Client can be outside of the Cluster, it can be run event on Windows. But only requirement is the client URL should be accessible by all peer nodes.(Or ngrok(https://ngrok.com) like tools can be used) Events implemented with this patch, - Volume Create - Volume Start - Volume Stop - Volume Delete - Peer Attach - Peer Detach It is easy to add/support more events, since it touches Gluster cmd code and to avoid merge conflicts I will add support for more events once this patch merges. BUG: 1334044 Change-Id: I316827ac9dd1443454df7deffe4f54835f7f6a08 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/14248 Smoke: Gluster Build System <jenkins@build.gluster.org> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> CentOS-regression: Gluster Build System <jenkins@build.gluster.org> Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Diffstat (limited to 'events/src/glustereventsd.py')
-rw-r--r--events/src/glustereventsd.py151
1 files changed, 151 insertions, 0 deletions
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
new file mode 100644
index 00000000000..3fa57686a8b
--- /dev/null
+++ b/events/src/glustereventsd.py
@@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+from __future__ import print_function
+import asyncore
+import socket
+import os
+from multiprocessing import Process, Queue
+import sys
+import signal
+
+from eventtypes import all_events
+import handlers
+import utils
+from eventsapiconf import SERVER_ADDRESS
+from utils import logger
+
+# Global Queue, EventsHandler will add items to the queue
+# and process_event will gets each item and handles it
+events_queue = Queue()
+events_server_pid = None
+
+
+def process_event():
+ """
+ Seperate process which handles all the incoming events from Gluster
+ processes.
+ """
+ while True:
+ data = events_queue.get()
+ logger.debug("EVENT: {0}".format(repr(data)))
+ try:
+ # Event Format <TIMESTAMP> <TYPE> <DETAIL>
+ ts, key, value = data.split(" ", 2)
+ except ValueError:
+ logger.warn("Invalid Event Format {0}".format(data))
+ continue
+
+ data_dict = {}
+ try:
+ # Format key=value;key=value
+ data_dict = dict(x.split('=') for x in value.split(';'))
+ except ValueError:
+ logger.warn("Unable to parse Event {0}".format(data))
+ continue
+
+ try:
+ # Event Type to Function Map, Recieved event data will be in
+ # the form <TIMESTAMP> <TYPE> <DETAIL>, Get Event name for the
+ # recieved Type/Key and construct a function name starting with
+ # handle_ For example: handle_event_volume_create
+ func_name = "handle_" + all_events[int(key)].lower()
+ except IndexError:
+ # This type of Event is not handled?
+ logger.warn("Unhandled Event: {0}".format(key))
+ func_name = None
+
+ if func_name is not None:
+ # Get function from handlers module
+ func = getattr(handlers, func_name, None)
+ # If func is None, then handler unimplemented for that event.
+ if func is not None:
+ func(ts, int(key), data_dict)
+ else:
+ # Generic handler, broadcast whatever received
+ handlers.generic_handler(ts, int(key), data_dict)
+
+
+def process_event_wrapper():
+ try:
+ process_event()
+ except KeyboardInterrupt:
+ return
+
+
+class GlusterEventsHandler(asyncore.dispatcher_with_send):
+
+ def handle_read(self):
+ data = self.recv(8192)
+ if data:
+ events_queue.put(data)
+ self.send(data)
+
+
+class GlusterEventsServer(asyncore.dispatcher):
+
+ def __init__(self):
+ global events_server_pid
+ asyncore.dispatcher.__init__(self)
+ # Start the Events listener process which listens to
+ # the global queue
+ p = Process(target=process_event_wrapper)
+ p.start()
+ events_server_pid = p.pid
+
+ # Create UNIX Domain Socket, bind to path
+ self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.bind(SERVER_ADDRESS)
+ self.listen(5)
+
+ def handle_accept(self):
+ pair = self.accept()
+ if pair is not None:
+ sock, addr = pair
+ GlusterEventsHandler(sock)
+
+
+def signal_handler_sigusr2(sig, frame):
+ if events_server_pid is not None:
+ os.kill(events_server_pid, signal.SIGUSR2)
+ utils.load_all()
+
+
+def init_event_server():
+ utils.setup_logger()
+
+ # Delete Socket file if Exists
+ try:
+ os.unlink(SERVER_ADDRESS)
+ except OSError:
+ if os.path.exists(SERVER_ADDRESS):
+ print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS),
+ file=sys.stderr)
+ sys.exit(1)
+
+ utils.load_all()
+
+ # Start the Eventing Server, UNIX DOMAIN SOCKET Server
+ GlusterEventsServer()
+ asyncore.loop()
+
+
+def main():
+ try:
+ init_event_server()
+ except KeyboardInterrupt:
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ signal.signal(signal.SIGUSR2, signal_handler_sigusr2)
+ main()