summaryrefslogtreecommitdiffstats
path: root/events/src
diff options
context:
space:
mode:
Diffstat (limited to 'events/src')
-rw-r--r--events/src/Makefile.am24
-rw-r--r--events/src/__init__.py10
-rw-r--r--events/src/eventsapiconf.py.in22
-rw-r--r--events/src/eventsconfig.json3
-rw-r--r--events/src/eventtypes.py9
-rw-r--r--events/src/glustereventsd.py151
-rw-r--r--events/src/handlers.py21
-rw-r--r--events/src/peer_eventsapi.py521
-rw-r--r--events/src/utils.py150
9 files changed, 911 insertions, 0 deletions
diff --git a/events/src/Makefile.am b/events/src/Makefile.am
new file mode 100644
index 00000000000..528f0208fe2
--- /dev/null
+++ b/events/src/Makefile.am
@@ -0,0 +1,24 @@
+EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in eventtypes.py \
+ handlers.py utils.py peer_eventsapi.py eventsconfig.json
+
+eventsdir = $(libexecdir)/glusterfs/events
+eventspeerscriptdir = $(libexecdir)/glusterfs
+eventsconfdir = $(sysconfdir)/glusterfs
+eventsconf_DATA = eventsconfig.json
+
+events_PYTHON = __init__.py eventsapiconf.py eventtypes.py handlers.py utils.py
+events_SCRIPTS = glustereventsd.py
+eventspeerscript_SCRIPTS = peer_eventsapi.py
+
+install-exec-hook:
+ $(mkdir_p) $(DESTDIR)$(sbindir)
+ rm -f $(DESTDIR)$(sbindir)/glustereventsd
+ ln -s $(libexecdir)/glusterfs/events/glustereventsd.py \
+ $(DESTDIR)$(sbindir)/glustereventsd
+ rm -f $(DESTDIR)$(sbindir)/gluster-eventing
+ ln -s $(libexecdir)/glusterfs/peer_eventsapi.py \
+ $(DESTDIR)$(sbindir)/gluster-eventsapi
+
+uninstall-hook:
+ rm -f $(DESTDIR)$(sbindir)/glustereventsd
+ rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi
diff --git a/events/src/__init__.py b/events/src/__init__.py
new file mode 100644
index 00000000000..f27c53a4df4
--- /dev/null
+++ b/events/src/__init__.py
@@ -0,0 +1,10 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in
new file mode 100644
index 00000000000..702e1d21820
--- /dev/null
+++ b/events/src/eventsapiconf.py.in
@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock"
+DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
+CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
+CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC
+WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"
+WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC
+LOG_FILE = "@localstatedir@/log/glusterfs/events.log"
+EVENTSD = "glustereventsd"
+CONFIG_KEYS = ["log_level"]
+BOOL_CONFIGS = []
+RESTART_CONFIGS = []
diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json
new file mode 100644
index 00000000000..ce2c775f0bd
--- /dev/null
+++ b/events/src/eventsconfig.json
@@ -0,0 +1,3 @@
+{
+ "log_level": "INFO"
+}
diff --git a/events/src/eventtypes.py b/events/src/eventtypes.py
new file mode 100644
index 00000000000..4812e659de3
--- /dev/null
+++ b/events/src/eventtypes.py
@@ -0,0 +1,9 @@
+# -*- coding: utf-8 -*-
+all_events = [
+ "EVENT_PEER_ATTACH",
+ "EVENT_PEER_DETACH",
+ "EVENT_VOLUME_CREATE",
+ "EVENT_VOLUME_START",
+ "EVENT_VOLUME_STOP",
+ "EVENT_VOLUME_DELETE",
+]
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
new file mode 100644
index 00000000000..3fa57686a8b
--- /dev/null
+++ b/events/src/glustereventsd.py
@@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+from __future__ import print_function
+import asyncore
+import socket
+import os
+from multiprocessing import Process, Queue
+import sys
+import signal
+
+from eventtypes import all_events
+import handlers
+import utils
+from eventsapiconf import SERVER_ADDRESS
+from utils import logger
+
+# Global Queue, EventsHandler will add items to the queue
+# and process_event will gets each item and handles it
+events_queue = Queue()
+events_server_pid = None
+
+
+def process_event():
+ """
+ Seperate process which handles all the incoming events from Gluster
+ processes.
+ """
+ while True:
+ data = events_queue.get()
+ logger.debug("EVENT: {0}".format(repr(data)))
+ try:
+ # Event Format <TIMESTAMP> <TYPE> <DETAIL>
+ ts, key, value = data.split(" ", 2)
+ except ValueError:
+ logger.warn("Invalid Event Format {0}".format(data))
+ continue
+
+ data_dict = {}
+ try:
+ # Format key=value;key=value
+ data_dict = dict(x.split('=') for x in value.split(';'))
+ except ValueError:
+ logger.warn("Unable to parse Event {0}".format(data))
+ continue
+
+ try:
+ # Event Type to Function Map, Recieved event data will be in
+ # the form <TIMESTAMP> <TYPE> <DETAIL>, Get Event name for the
+ # recieved Type/Key and construct a function name starting with
+ # handle_ For example: handle_event_volume_create
+ func_name = "handle_" + all_events[int(key)].lower()
+ except IndexError:
+ # This type of Event is not handled?
+ logger.warn("Unhandled Event: {0}".format(key))
+ func_name = None
+
+ if func_name is not None:
+ # Get function from handlers module
+ func = getattr(handlers, func_name, None)
+ # If func is None, then handler unimplemented for that event.
+ if func is not None:
+ func(ts, int(key), data_dict)
+ else:
+ # Generic handler, broadcast whatever received
+ handlers.generic_handler(ts, int(key), data_dict)
+
+
+def process_event_wrapper():
+ try:
+ process_event()
+ except KeyboardInterrupt:
+ return
+
+
+class GlusterEventsHandler(asyncore.dispatcher_with_send):
+
+ def handle_read(self):
+ data = self.recv(8192)
+ if data:
+ events_queue.put(data)
+ self.send(data)
+
+
+class GlusterEventsServer(asyncore.dispatcher):
+
+ def __init__(self):
+ global events_server_pid
+ asyncore.dispatcher.__init__(self)
+ # Start the Events listener process which listens to
+ # the global queue
+ p = Process(target=process_event_wrapper)
+ p.start()
+ events_server_pid = p.pid
+
+ # Create UNIX Domain Socket, bind to path
+ self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.bind(SERVER_ADDRESS)
+ self.listen(5)
+
+ def handle_accept(self):
+ pair = self.accept()
+ if pair is not None:
+ sock, addr = pair
+ GlusterEventsHandler(sock)
+
+
+def signal_handler_sigusr2(sig, frame):
+ if events_server_pid is not None:
+ os.kill(events_server_pid, signal.SIGUSR2)
+ utils.load_all()
+
+
+def init_event_server():
+ utils.setup_logger()
+
+ # Delete Socket file if Exists
+ try:
+ os.unlink(SERVER_ADDRESS)
+ except OSError:
+ if os.path.exists(SERVER_ADDRESS):
+ print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS),
+ file=sys.stderr)
+ sys.exit(1)
+
+ utils.load_all()
+
+ # Start the Eventing Server, UNIX DOMAIN SOCKET Server
+ GlusterEventsServer()
+ asyncore.loop()
+
+
+def main():
+ try:
+ init_event_server()
+ except KeyboardInterrupt:
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ signal.signal(signal.SIGUSR2, signal_handler_sigusr2)
+ main()
diff --git a/events/src/handlers.py b/events/src/handlers.py
new file mode 100644
index 00000000000..9b756a91d51
--- /dev/null
+++ b/events/src/handlers.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import utils
+
+
+def generic_handler(ts, key, data):
+ """
+ Generic handler to broadcast message to all peers, custom handlers
+ can be created by func name handler_<event_name>
+ Ex: handle_event_volume_create(ts, key, data)
+ """
+ utils.publish(ts, key, data)
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py
new file mode 100644
index 00000000000..7887d77351c
--- /dev/null
+++ b/events/src/peer_eventsapi.py
@@ -0,0 +1,521 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+from __future__ import print_function
+import os
+import json
+from errno import EEXIST
+
+import requests
+import fasteners
+from prettytable import PrettyTable
+
+from gluster.cliutils import (Cmd, execute, node_output_ok, node_output_notok,
+ sync_file_to_peers, GlusterCmdException,
+ output_error, execute_in_peers, runcli)
+
+from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE_TO_SYNC,
+ EVENTSD,
+ CONFIG_KEYS,
+ BOOL_CONFIGS,
+ RESTART_CONFIGS)
+
+
+def file_content_overwrite(fname, data):
+ with open(fname + ".tmp", "w") as f:
+ f.write(json.dumps(data))
+
+ os.rename(fname + ".tmp", fname)
+
+
+def create_custom_config_file_if_not_exists():
+ mkdirp(os.path.dirname(CUSTOM_CONFIG_FILE))
+ if not os.path.exists(CUSTOM_CONFIG_FILE):
+ with open(CUSTOM_CONFIG_FILE, "w") as f:
+ f.write("{}")
+
+
+def create_webhooks_file_if_not_exists():
+ mkdirp(os.path.dirname(WEBHOOKS_FILE))
+ if not os.path.exists(WEBHOOKS_FILE):
+ with open(WEBHOOKS_FILE, "w") as f:
+ f.write("{}")
+
+
+def boolify(value):
+ val = False
+ if value.lower() in ["enabled", "true", "on", "yes"]:
+ val = True
+ return val
+
+
+def mkdirp(path, exit_on_err=False, logger=None):
+ """
+ Try creating required directory structure
+ ignore EEXIST and raise exception for rest of the errors.
+ Print error in stderr and exit
+ """
+ try:
+ os.makedirs(path)
+ except (OSError, IOError) as e:
+ if e.errno == EEXIST and os.path.isdir(path):
+ pass
+ else:
+ output_error("Fail to create dir %s: %s" % (path, e))
+
+
+def is_enabled(service):
+ rc, out, err = execute(["systemctl", "is-enabled", service])
+ return rc == 0
+
+
+def is_active(service):
+ rc, out, err = execute(["systemctl", "is-active", service])
+ return rc == 0
+
+
+def enable_service(service):
+ if not is_enabled(service):
+ cmd = ["systemctl", "enable", service]
+ return execute(cmd)
+
+ return (0, "", "")
+
+
+def disable_service(service):
+ if is_enabled(service):
+ cmd = ["systemctl", "disable", service]
+ return execute(cmd)
+
+ return (0, "", "")
+
+
+def start_service(service):
+ rc, out, err = enable_service(service)
+ if rc != 0:
+ return (rc, out, err)
+
+ cmd = ["systemctl", "start", service]
+ return execute(cmd)
+
+
+def stop_service(service):
+ rc, out, err = disable_service(service)
+ if rc != 0:
+ return (rc, out, err)
+
+ cmd = ["systemctl", "stop", service]
+ return execute(cmd)
+
+
+def restart_service(service):
+ rc, out, err = stop_service(service)
+ if rc != 0:
+ return (rc, out, err)
+
+ return start_service(service)
+
+
+def reload_service(service):
+ if is_active(service):
+ cmd = ["systemctl", "reload", service]
+ return execute(cmd)
+
+ return (0, "", "")
+
+
+def sync_to_peers(restart=False):
+ if os.path.exists(WEBHOOKS_FILE):
+ try:
+ sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC)
+ except GlusterCmdException as e:
+ output_error("Failed to sync Webhooks file: [Error: {0}]"
+ "{1}".format(e[0], e[2]))
+
+ if os.path.exists(CUSTOM_CONFIG_FILE):
+ try:
+ sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC)
+ except GlusterCmdException as e:
+ output_error("Failed to sync Config file: [Error: {0}]"
+ "{1}".format(e[0], e[2]))
+
+ action = "node-reload"
+ if restart:
+ action = "node-restart"
+
+ out = execute_in_peers(action)
+ table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"])
+ table.align["NODE STATUS"] = "r"
+ table.align["SYNC STATUS"] = "r"
+
+ for p in out:
+ table.add_row([p.hostname,
+ "UP" if p.node_up else "DOWN",
+ "OK" if p.ok else "NOT OK: {0}".format(
+ p.error)])
+
+ print (table)
+
+
+def node_output_handle(resp):
+ rc, out, err = resp
+ if rc == 0:
+ node_output_ok(out)
+ else:
+ node_output_notok(err)
+
+
+def action_handle(action):
+ out = execute_in_peers("node-" + action)
+ column_name = action.upper()
+ if action == "status":
+ column_name = EVENTSD.upper()
+
+ table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"])
+ table.align["NODE STATUS"] = "r"
+ table.align[column_name + " STATUS"] = "r"
+
+ for p in out:
+ status_col_val = "OK" if p.ok else "NOT OK: {0}".format(
+ p.error)
+ if action == "status":
+ status_col_val = "DOWN"
+ if p.ok:
+ status_col_val = p.output
+
+ table.add_row([p.hostname,
+ "UP" if p.node_up else "DOWN",
+ status_col_val])
+
+ print (table)
+
+
+class NodeStart(Cmd):
+ name = "node-start"
+
+ def run(self, args):
+ node_output_handle(start_service(EVENTSD))
+
+
+class StartCmd(Cmd):
+ name = "start"
+
+ def run(self, args):
+ action_handle("start")
+
+
+class NodeStop(Cmd):
+ name = "node-stop"
+
+ def run(self, args):
+ node_output_handle(stop_service(EVENTSD))
+
+
+class StopCmd(Cmd):
+ name = "stop"
+
+ def run(self, args):
+ action_handle("stop")
+
+
+class NodeRestart(Cmd):
+ name = "node-restart"
+
+ def run(self, args):
+ node_output_handle(restart_service(EVENTSD))
+
+
+class RestartCmd(Cmd):
+ name = "restart"
+
+ def run(self, args):
+ action_handle("restart")
+
+
+class NodeReload(Cmd):
+ name = "node-reload"
+
+ def run(self, args):
+ node_output_handle(reload_service(EVENTSD))
+
+
+class ReloadCmd(Cmd):
+ name = "reload"
+
+ def run(self, args):
+ action_handle("reload")
+
+
+class NodeStatus(Cmd):
+ name = "node-status"
+
+ def run(self, args):
+ node_output_ok("UP" if is_active(EVENTSD) else "DOWN")
+
+
+class StatusCmd(Cmd):
+ name = "status"
+
+ def run(self, args):
+ webhooks = {}
+ if os.path.exists(WEBHOOKS_FILE):
+ webhooks = json.load(open(WEBHOOKS_FILE))
+
+ print ("Webhooks: " + ("" if webhooks else "None"))
+ for w in webhooks:
+ print (w)
+
+ print ()
+ action_handle("status")
+
+
+class WebhookAddCmd(Cmd):
+ name = "webhook-add"
+
+ def args(self, parser):
+ parser.add_argument("url", help="URL of Webhook")
+ parser.add_argument("--bearer_token", "-t", help="Bearer Token",
+ default="")
+
+ def run(self, args):
+ create_webhooks_file_if_not_exists()
+
+ with fasteners.InterProcessLock(WEBHOOKS_FILE):
+ data = json.load(open(WEBHOOKS_FILE))
+ if data.get(args.url, None) is not None:
+ output_error("Webhook already exists")
+
+ data[args.url] = args.bearer_token
+ file_content_overwrite(WEBHOOKS_FILE, data)
+
+ sync_to_peers()
+
+
+class WebhookModCmd(Cmd):
+ name = "webhook-mod"
+
+ def args(self, parser):
+ parser.add_argument("url", help="URL of Webhook")
+ parser.add_argument("--bearer_token", "-t", help="Bearer Token",
+ default="")
+
+ def run(self, args):
+ create_webhooks_file_if_not_exists()
+
+ with fasteners.InterProcessLock(WEBHOOKS_FILE):
+ data = json.load(open(WEBHOOKS_FILE))
+ if data.get(args.url, None) is None:
+ output_error("Webhook does not exists")
+
+ data[args.url] = args.bearer_token
+ file_content_overwrite(WEBHOOKS_FILE, data)
+
+ sync_to_peers()
+
+
+class WebhookDelCmd(Cmd):
+ name = "webhook-del"
+
+ def args(self, parser):
+ parser.add_argument("url", help="URL of Webhook")
+
+ def run(self, args):
+ create_webhooks_file_if_not_exists()
+
+ with fasteners.InterProcessLock(WEBHOOKS_FILE):
+ data = json.load(open(WEBHOOKS_FILE))
+ if data.get(args.url, None) is None:
+ output_error("Webhook does not exists")
+
+ del data[args.url]
+ file_content_overwrite(WEBHOOKS_FILE, data)
+
+ sync_to_peers()
+
+
+class NodeWebhookTestCmd(Cmd):
+ name = "node-webhook-test"
+
+ def args(self, parser):
+ parser.add_argument("url")
+ parser.add_argument("bearer_token")
+
+ def run(self, args):
+ http_headers = {}
+ if args.bearer_token != ".":
+ http_headers["Authorization"] = "Bearer " + args.bearer_token
+
+ try:
+ resp = requests.post(args.url, headers=http_headers)
+ except requests.ConnectionError as e:
+ node_output_notok("{0}".format(e))
+
+ if resp.status_code != 200:
+ node_output_notok("{0}".format(resp.status_code))
+
+ node_output_ok()
+
+
+class WebhookTestCmd(Cmd):
+ name = "webhook-test"
+
+ def args(self, parser):
+ parser.add_argument("url", help="URL of Webhook")
+ parser.add_argument("--bearer_token", "-t", help="Bearer Token")
+
+ def run(self, args):
+ url = args.url
+ bearer_token = args.bearer_token
+ if not args.url:
+ url = "."
+ if not args.bearer_token:
+ bearer_token = "."
+
+ out = execute_in_peers("node-webhook-test", [url, bearer_token])
+
+ table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"])
+ table.align["NODE STATUS"] = "r"
+ table.align["WEBHOOK STATUS"] = "r"
+
+ for p in out:
+ table.add_row([p.hostname,
+ "UP" if p.node_up else "DOWN",
+ "OK" if p.ok else "NOT OK: {0}".format(
+ p.error)])
+
+ print (table)
+
+
+class ConfigGetCmd(Cmd):
+ name = "config-get"
+
+ def args(self, parser):
+ parser.add_argument("--name", help="Config Name")
+
+ def run(self, args):
+ data = json.load(open(DEFAULT_CONFIG_FILE))
+ if os.path.exists(CUSTOM_CONFIG_FILE):
+ data.update(json.load(open(CUSTOM_CONFIG_FILE)))
+
+ if args.name is not None and args.name not in CONFIG_KEYS:
+ output_error("Invalid Config item")
+
+ table = PrettyTable(["NAME", "VALUE"])
+ if args.name is None:
+ for k, v in data.items():
+ table.add_row([k, v])
+ else:
+ table.add_row([args.name, data[args.name]])
+
+ print (table)
+
+
+def read_file_content_json(fname):
+ content = "{}"
+ with open(fname) as f:
+ content = f.read()
+ if content.strip() == "":
+ content = "{}"
+
+ return json.loads(content)
+
+
+class ConfigSetCmd(Cmd):
+ name = "config-set"
+
+ def args(self, parser):
+ parser.add_argument("name", help="Config Name")
+ parser.add_argument("value", help="Config Value")
+
+ def run(self, args):
+ if args.name not in CONFIG_KEYS:
+ output_error("Invalid Config item")
+
+ with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE):
+ data = json.load(open(DEFAULT_CONFIG_FILE))
+ if os.path.exists(CUSTOM_CONFIG_FILE):
+ config_json = read_file_content_json(CUSTOM_CONFIG_FILE)
+ data.update(config_json)
+
+ # Do Nothing if same as previous value
+ if data[args.name] == args.value:
+ return
+
+ # TODO: Validate Value
+ create_custom_config_file_if_not_exists()
+ new_data = read_file_content_json(CUSTOM_CONFIG_FILE)
+
+ v = args.value
+ if args.name in BOOL_CONFIGS:
+ v = boolify(args.value)
+
+ new_data[args.name] = v
+ file_content_overwrite(CUSTOM_CONFIG_FILE, new_data)
+
+ # If any value changed which requires restart of REST server
+ restart = False
+ if args.name in RESTART_CONFIGS:
+ restart = True
+
+ sync_to_peers(restart=restart)
+
+
+class ConfigResetCmd(Cmd):
+ name = "config-reset"
+
+ def args(self, parser):
+ parser.add_argument("name", help="Config Name or all")
+
+ def run(self, args):
+ with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE):
+ changed_keys = []
+ data = {}
+ if os.path.exists(CUSTOM_CONFIG_FILE):
+ data = read_file_content_json(CUSTOM_CONFIG_FILE)
+
+ if not data:
+ return
+
+ if args.name.lower() == "all":
+ for k, v in data.items():
+ changed_keys.append(k)
+
+ # Reset all keys
+ file_content_overwrite(CUSTOM_CONFIG_FILE, {})
+ else:
+ changed_keys.append(args.name)
+ del data[args.name]
+ file_content_overwrite(CUSTOM_CONFIG_FILE, data)
+
+ # If any value changed which requires restart of REST server
+ restart = False
+ for key in changed_keys:
+ if key in RESTART_CONFIGS:
+ restart = True
+ break
+
+ sync_to_peers(restart=restart)
+
+
+class SyncCmd(Cmd):
+ name = "sync"
+
+ def run(self, args):
+ sync_to_peers()
+
+
+if __name__ == "__main__":
+ runcli()
diff --git a/events/src/utils.py b/events/src/utils.py
new file mode 100644
index 00000000000..772221a1e25
--- /dev/null
+++ b/events/src/utils.py
@@ -0,0 +1,150 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import json
+import os
+import logging
+
+import requests
+from eventsapiconf import (LOG_FILE,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE)
+import eventtypes
+
+from gluster.cliutils import get_node_uuid
+
+
+# Webhooks list
+_webhooks = {}
+# Default Log Level
+_log_level = "INFO"
+# Config Object
+_config = {}
+
+# Init Logger instance
+logger = logging.getLogger(__name__)
+
+
+def get_event_type_name(idx):
+ """
+ Returns Event Type text from the index. For example, VOLUME_CREATE
+ """
+ return eventtypes.all_events[idx].replace("EVENT_", "")
+
+
+def setup_logger():
+ """
+ Logging initialization, Log level by default will be INFO, once config
+ file is read, respective log_level will be set.
+ """
+ global logger
+ logger.setLevel(logging.INFO)
+
+ # create the logging file handler
+ fh = logging.FileHandler(LOG_FILE)
+
+ formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
+ "[%(module)s - %(lineno)s:%(funcName)s] "
+ "- %(message)s")
+
+ fh.setFormatter(formatter)
+
+ # add handler to logger object
+ logger.addHandler(fh)
+
+
+def load_config():
+ """
+ Load/Reload the config from REST Config files. This function will
+ be triggered during init and when SIGUSR2.
+ """
+ global _config
+ _config = {}
+ if os.path.exists(DEFAULT_CONFIG_FILE):
+ _config = json.load(open(DEFAULT_CONFIG_FILE))
+ if os.path.exists(CUSTOM_CONFIG_FILE):
+ _config.update(json.load(open(CUSTOM_CONFIG_FILE)))
+
+
+def load_log_level():
+ """
+ Reads log_level from Config file and sets accordingly. This function will
+ be triggered during init and when SIGUSR2.
+ """
+ global logger, _log_level
+ new_log_level = _config.get("log_level", "INFO")
+ if _log_level != new_log_level:
+ logger.setLevel(getattr(logging, new_log_level.upper()))
+ _log_level = new_log_level.upper()
+
+
+def load_webhooks():
+ """
+ Load/Reload the webhooks list. This function will
+ be triggered during init and when SIGUSR2.
+ """
+ global _webhooks
+ _webhooks = {}
+ if os.path.exists(WEBHOOKS_FILE):
+ _webhooks = json.load(open(WEBHOOKS_FILE))
+
+
+def load_all():
+ """
+ Wrapper function to call all load/reload functions. This function will
+ be triggered during init and when SIGUSR2.
+ """
+ load_config()
+ load_webhooks()
+ load_log_level()
+
+
+def publish(ts, event_key, data):
+ message = {
+ "nodeid": get_node_uuid(),
+ "ts": int(ts),
+ "event": get_event_type_name(event_key),
+ "message": data
+ }
+ if _webhooks:
+ plugin_webhook(message)
+ else:
+ # TODO: Default action?
+ pass
+
+
+def plugin_webhook(message):
+ message_json = json.dumps(message, sort_keys=True)
+ logger.debug("EVENT: {0}".format(message_json))
+ for url, token in _webhooks.items():
+ http_headers = {"Content-Type": "application/json"}
+ if token != "" and token is not None:
+ http_headers["Authorization"] = "Bearer " + token
+
+ try:
+ resp = requests.post(url, headers=http_headers, data=message_json)
+ except requests.ConnectionError as e:
+ logger.warn("Event push failed to URL: {url}, "
+ "Event: {event}, "
+ "Status: {error}".format(
+ url=url,
+ event=message_json,
+ error=e))
+ continue
+
+ if resp.status_code != 200:
+ logger.warn("Event push failed to URL: {url}, "
+ "Event: {event}, "
+ "Status Code: {status_code}".format(
+ url=url,
+ event=message_json,
+ status_code=resp.status_code))