diff options
Diffstat (limited to 'events/src')
| -rw-r--r-- | events/src/Makefile.am | 24 | ||||
| -rw-r--r-- | events/src/__init__.py | 10 | ||||
| -rw-r--r-- | events/src/eventsapiconf.py.in | 22 | ||||
| -rw-r--r-- | events/src/eventsconfig.json | 3 | ||||
| -rw-r--r-- | events/src/eventtypes.py | 9 | ||||
| -rw-r--r-- | events/src/glustereventsd.py | 151 | ||||
| -rw-r--r-- | events/src/handlers.py | 21 | ||||
| -rw-r--r-- | events/src/peer_eventsapi.py | 521 | ||||
| -rw-r--r-- | events/src/utils.py | 150 | 
9 files changed, 911 insertions, 0 deletions
diff --git a/events/src/Makefile.am b/events/src/Makefile.am new file mode 100644 index 00000000000..528f0208fe2 --- /dev/null +++ b/events/src/Makefile.am @@ -0,0 +1,24 @@ +EXTRA_DIST = glustereventsd.py __init__.py  eventsapiconf.py.in eventtypes.py \ +	handlers.py utils.py peer_eventsapi.py eventsconfig.json + +eventsdir = $(libexecdir)/glusterfs/events +eventspeerscriptdir = $(libexecdir)/glusterfs +eventsconfdir = $(sysconfdir)/glusterfs +eventsconf_DATA = eventsconfig.json + +events_PYTHON = __init__.py eventsapiconf.py eventtypes.py handlers.py utils.py +events_SCRIPTS = glustereventsd.py +eventspeerscript_SCRIPTS = peer_eventsapi.py + +install-exec-hook: +	$(mkdir_p) $(DESTDIR)$(sbindir) +	rm -f $(DESTDIR)$(sbindir)/glustereventsd +	ln -s $(libexecdir)/glusterfs/events/glustereventsd.py \ +		$(DESTDIR)$(sbindir)/glustereventsd +	rm -f $(DESTDIR)$(sbindir)/gluster-eventing +	ln -s $(libexecdir)/glusterfs/peer_eventsapi.py \ +		$(DESTDIR)$(sbindir)/gluster-eventsapi + +uninstall-hook: +	rm -f $(DESTDIR)$(sbindir)/glustereventsd +	rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi diff --git a/events/src/__init__.py b/events/src/__init__.py new file mode 100644 index 00000000000..f27c53a4df4 --- /dev/null +++ b/events/src/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +# +#  Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +#  This file is part of GlusterFS. +# +#  This file is licensed to you under your choice of the GNU Lesser +#  General Public License, version 3 or any later version (LGPLv3 or +#  later), or the GNU General Public License, version 2 (GPLv2), in all +#  cases as published by the Free Software Foundation. +# diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in new file mode 100644 index 00000000000..702e1d21820 --- /dev/null +++ b/events/src/eventsapiconf.py.in @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +# +#  Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +#  This file is part of GlusterFS. +# +#  This file is licensed to you under your choice of the GNU Lesser +#  General Public License, version 3 or any later version (LGPLv3 or +#  later), or the GNU General Public License, version 2 (GPLv2), in all +#  cases as published by the Free Software Foundation. +# + +SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock" +DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json" +CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json" +CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC +WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json" +WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC +LOG_FILE = "@localstatedir@/log/glusterfs/events.log" +EVENTSD = "glustereventsd" +CONFIG_KEYS = ["log_level"] +BOOL_CONFIGS = [] +RESTART_CONFIGS = [] diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json new file mode 100644 index 00000000000..ce2c775f0bd --- /dev/null +++ b/events/src/eventsconfig.json @@ -0,0 +1,3 @@ +{ +    "log_level": "INFO" +} diff --git a/events/src/eventtypes.py b/events/src/eventtypes.py new file mode 100644 index 00000000000..4812e659de3 --- /dev/null +++ b/events/src/eventtypes.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +all_events = [ +    "EVENT_PEER_ATTACH", +    "EVENT_PEER_DETACH", +    "EVENT_VOLUME_CREATE", +    "EVENT_VOLUME_START", +    "EVENT_VOLUME_STOP", +    "EVENT_VOLUME_DELETE", +] diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py new file mode 100644 index 00000000000..3fa57686a8b --- /dev/null +++ b/events/src/glustereventsd.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +#  Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +#  This file is part of GlusterFS. +# +#  This file is licensed to you under your choice of the GNU Lesser +#  General Public License, version 3 or any later version (LGPLv3 or +#  later), or the GNU General Public License, version 2 (GPLv2), in all +#  cases as published by the Free Software Foundation. +# + +from __future__ import print_function +import asyncore +import socket +import os +from multiprocessing import Process, Queue +import sys +import signal + +from eventtypes import all_events +import handlers +import utils +from eventsapiconf import SERVER_ADDRESS +from utils import logger + +# Global Queue, EventsHandler will add items to the queue +# and process_event will gets each item and handles it +events_queue = Queue() +events_server_pid = None + + +def process_event(): +    """ +    Seperate process which handles all the incoming events from Gluster +    processes. +    """ +    while True: +        data = events_queue.get() +        logger.debug("EVENT: {0}".format(repr(data))) +        try: +            # Event Format <TIMESTAMP> <TYPE> <DETAIL> +            ts, key, value = data.split(" ", 2) +        except ValueError: +            logger.warn("Invalid Event Format {0}".format(data)) +            continue + +        data_dict = {} +        try: +            # Format key=value;key=value +            data_dict = dict(x.split('=') for x in value.split(';')) +        except ValueError: +            logger.warn("Unable to parse Event {0}".format(data)) +            continue + +        try: +            # Event Type to Function Map, Recieved event data will be in +            # the form <TIMESTAMP> <TYPE> <DETAIL>, Get Event name for the +            # recieved Type/Key and construct a function name starting with +            # handle_ For example: handle_event_volume_create +            func_name = "handle_" + all_events[int(key)].lower() +        except IndexError: +            # This type of Event is not handled? +            logger.warn("Unhandled Event: {0}".format(key)) +            func_name = None + +        if func_name is not None: +            # Get function from handlers module +            func = getattr(handlers, func_name, None) +            # If func is None, then handler unimplemented for that event. +            if func is not None: +                func(ts, int(key), data_dict) +            else: +                # Generic handler, broadcast whatever received +                handlers.generic_handler(ts, int(key), data_dict) + + +def process_event_wrapper(): +    try: +        process_event() +    except KeyboardInterrupt: +        return + + +class GlusterEventsHandler(asyncore.dispatcher_with_send): + +    def handle_read(self): +        data = self.recv(8192) +        if data: +            events_queue.put(data) +            self.send(data) + + +class GlusterEventsServer(asyncore.dispatcher): + +    def __init__(self): +        global events_server_pid +        asyncore.dispatcher.__init__(self) +        # Start the Events listener process which listens to +        # the global queue +        p = Process(target=process_event_wrapper) +        p.start() +        events_server_pid = p.pid + +        # Create UNIX Domain Socket, bind to path +        self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) +        self.bind(SERVER_ADDRESS) +        self.listen(5) + +    def handle_accept(self): +        pair = self.accept() +        if pair is not None: +            sock, addr = pair +            GlusterEventsHandler(sock) + + +def signal_handler_sigusr2(sig, frame): +    if events_server_pid is not None: +        os.kill(events_server_pid, signal.SIGUSR2) +    utils.load_all() + + +def init_event_server(): +    utils.setup_logger() + +    # Delete Socket file if Exists +    try: +        os.unlink(SERVER_ADDRESS) +    except OSError: +        if os.path.exists(SERVER_ADDRESS): +            print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS), +                   file=sys.stderr) +            sys.exit(1) + +    utils.load_all() + +    # Start the Eventing Server, UNIX DOMAIN SOCKET Server +    GlusterEventsServer() +    asyncore.loop() + + +def main(): +    try: +        init_event_server() +    except KeyboardInterrupt: +        sys.exit(1) + + +if __name__ == "__main__": +    signal.signal(signal.SIGUSR2, signal_handler_sigusr2) +    main() diff --git a/events/src/handlers.py b/events/src/handlers.py new file mode 100644 index 00000000000..9b756a91d51 --- /dev/null +++ b/events/src/handlers.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# +#  Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +#  This file is part of GlusterFS. +# +#  This file is licensed to you under your choice of the GNU Lesser +#  General Public License, version 3 or any later version (LGPLv3 or +#  later), or the GNU General Public License, version 2 (GPLv2), in all +#  cases as published by the Free Software Foundation. +# + +import utils + + +def generic_handler(ts, key, data): +    """ +    Generic handler to broadcast message to all peers, custom handlers +    can be created by func name handler_<event_name> +    Ex: handle_event_volume_create(ts, key, data) +    """ +    utils.publish(ts, key, data) diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py new file mode 100644 index 00000000000..7887d77351c --- /dev/null +++ b/events/src/peer_eventsapi.py @@ -0,0 +1,521 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +#  Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +#  This file is part of GlusterFS. +# +#  This file is licensed to you under your choice of the GNU Lesser +#  General Public License, version 3 or any later version (LGPLv3 or +#  later), or the GNU General Public License, version 2 (GPLv2), in all +#  cases as published by the Free Software Foundation. +# + +from __future__ import print_function +import os +import json +from errno import EEXIST + +import requests +import fasteners +from prettytable import PrettyTable + +from gluster.cliutils import (Cmd, execute, node_output_ok, node_output_notok, +                              sync_file_to_peers, GlusterCmdException, +                              output_error, execute_in_peers, runcli) + +from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, +                                  WEBHOOKS_FILE, +                                  DEFAULT_CONFIG_FILE, +                                  CUSTOM_CONFIG_FILE, +                                  CUSTOM_CONFIG_FILE_TO_SYNC, +                                  EVENTSD, +                                  CONFIG_KEYS, +                                  BOOL_CONFIGS, +                                  RESTART_CONFIGS) + + +def file_content_overwrite(fname, data): +    with open(fname + ".tmp", "w") as f: +        f.write(json.dumps(data)) + +    os.rename(fname + ".tmp", fname) + + +def create_custom_config_file_if_not_exists(): +    mkdirp(os.path.dirname(CUSTOM_CONFIG_FILE)) +    if not os.path.exists(CUSTOM_CONFIG_FILE): +        with open(CUSTOM_CONFIG_FILE, "w") as f: +            f.write("{}") + + +def create_webhooks_file_if_not_exists(): +    mkdirp(os.path.dirname(WEBHOOKS_FILE)) +    if not os.path.exists(WEBHOOKS_FILE): +        with open(WEBHOOKS_FILE, "w") as f: +            f.write("{}") + + +def boolify(value): +    val = False +    if value.lower() in ["enabled", "true", "on", "yes"]: +        val = True +    return val + + +def mkdirp(path, exit_on_err=False, logger=None): +    """ +    Try creating required directory structure +    ignore EEXIST and raise exception for rest of the errors. +    Print error in stderr and exit +    """ +    try: +        os.makedirs(path) +    except (OSError, IOError) as e: +        if e.errno == EEXIST and os.path.isdir(path): +            pass +        else: +            output_error("Fail to create dir %s: %s" % (path, e)) + + +def is_enabled(service): +    rc, out, err = execute(["systemctl", "is-enabled", service]) +    return rc == 0 + + +def is_active(service): +    rc, out, err = execute(["systemctl", "is-active", service]) +    return rc == 0 + + +def enable_service(service): +    if not is_enabled(service): +        cmd = ["systemctl", "enable", service] +        return execute(cmd) + +    return (0, "", "") + + +def disable_service(service): +    if is_enabled(service): +        cmd = ["systemctl", "disable", service] +        return execute(cmd) + +    return (0, "", "") + + +def start_service(service): +    rc, out, err = enable_service(service) +    if rc != 0: +        return (rc, out, err) + +    cmd = ["systemctl", "start", service] +    return execute(cmd) + + +def stop_service(service): +    rc, out, err = disable_service(service) +    if rc != 0: +        return (rc, out, err) + +    cmd = ["systemctl", "stop", service] +    return execute(cmd) + + +def restart_service(service): +    rc, out, err = stop_service(service) +    if rc != 0: +        return (rc, out, err) + +    return start_service(service) + + +def reload_service(service): +    if is_active(service): +        cmd = ["systemctl", "reload", service] +        return execute(cmd) + +    return (0, "", "") + + +def sync_to_peers(restart=False): +    if os.path.exists(WEBHOOKS_FILE): +        try: +            sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC) +        except GlusterCmdException as e: +            output_error("Failed to sync Webhooks file: [Error: {0}]" +                         "{1}".format(e[0], e[2])) + +    if os.path.exists(CUSTOM_CONFIG_FILE): +        try: +            sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC) +        except GlusterCmdException as e: +            output_error("Failed to sync Config file: [Error: {0}]" +                         "{1}".format(e[0], e[2])) + +    action = "node-reload" +    if restart: +        action = "node-restart" + +    out = execute_in_peers(action) +    table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"]) +    table.align["NODE STATUS"] = "r" +    table.align["SYNC STATUS"] = "r" + +    for p in out: +        table.add_row([p.hostname, +                       "UP" if p.node_up else "DOWN", +                       "OK" if p.ok else "NOT OK: {0}".format( +                           p.error)]) + +    print (table) + + +def node_output_handle(resp): +    rc, out, err = resp +    if rc == 0: +        node_output_ok(out) +    else: +        node_output_notok(err) + + +def action_handle(action): +    out = execute_in_peers("node-" + action) +    column_name = action.upper() +    if action == "status": +        column_name = EVENTSD.upper() + +    table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"]) +    table.align["NODE STATUS"] = "r" +    table.align[column_name + " STATUS"] = "r" + +    for p in out: +        status_col_val = "OK" if p.ok else "NOT OK: {0}".format( +            p.error) +        if action == "status": +            status_col_val = "DOWN" +            if p.ok: +                status_col_val = p.output + +        table.add_row([p.hostname, +                       "UP" if p.node_up else "DOWN", +                       status_col_val]) + +    print (table) + + +class NodeStart(Cmd): +    name = "node-start" + +    def run(self, args): +        node_output_handle(start_service(EVENTSD)) + + +class StartCmd(Cmd): +    name = "start" + +    def run(self, args): +        action_handle("start") + + +class NodeStop(Cmd): +    name = "node-stop" + +    def run(self, args): +        node_output_handle(stop_service(EVENTSD)) + + +class StopCmd(Cmd): +    name = "stop" + +    def run(self, args): +        action_handle("stop") + + +class NodeRestart(Cmd): +    name = "node-restart" + +    def run(self, args): +        node_output_handle(restart_service(EVENTSD)) + + +class RestartCmd(Cmd): +    name = "restart" + +    def run(self, args): +        action_handle("restart") + + +class NodeReload(Cmd): +    name = "node-reload" + +    def run(self, args): +        node_output_handle(reload_service(EVENTSD)) + + +class ReloadCmd(Cmd): +    name = "reload" + +    def run(self, args): +        action_handle("reload") + + +class NodeStatus(Cmd): +    name = "node-status" + +    def run(self, args): +        node_output_ok("UP" if is_active(EVENTSD) else "DOWN") + + +class StatusCmd(Cmd): +    name = "status" + +    def run(self, args): +        webhooks = {} +        if os.path.exists(WEBHOOKS_FILE): +            webhooks = json.load(open(WEBHOOKS_FILE)) + +        print ("Webhooks: " + ("" if webhooks else "None")) +        for w in webhooks: +            print (w) + +        print () +        action_handle("status") + + +class WebhookAddCmd(Cmd): +    name = "webhook-add" + +    def args(self, parser): +        parser.add_argument("url", help="URL of Webhook") +        parser.add_argument("--bearer_token", "-t", help="Bearer Token", +                            default="") + +    def run(self, args): +        create_webhooks_file_if_not_exists() + +        with fasteners.InterProcessLock(WEBHOOKS_FILE): +            data = json.load(open(WEBHOOKS_FILE)) +            if data.get(args.url, None) is not None: +                output_error("Webhook already exists") + +            data[args.url] = args.bearer_token +            file_content_overwrite(WEBHOOKS_FILE, data) + +        sync_to_peers() + + +class WebhookModCmd(Cmd): +    name = "webhook-mod" + +    def args(self, parser): +        parser.add_argument("url", help="URL of Webhook") +        parser.add_argument("--bearer_token", "-t", help="Bearer Token", +                            default="") + +    def run(self, args): +        create_webhooks_file_if_not_exists() + +        with fasteners.InterProcessLock(WEBHOOKS_FILE): +            data = json.load(open(WEBHOOKS_FILE)) +            if data.get(args.url, None) is None: +                output_error("Webhook does not exists") + +            data[args.url] = args.bearer_token +            file_content_overwrite(WEBHOOKS_FILE, data) + +        sync_to_peers() + + +class WebhookDelCmd(Cmd): +    name = "webhook-del" + +    def args(self, parser): +        parser.add_argument("url", help="URL of Webhook") + +    def run(self, args): +        create_webhooks_file_if_not_exists() + +        with fasteners.InterProcessLock(WEBHOOKS_FILE): +            data = json.load(open(WEBHOOKS_FILE)) +            if data.get(args.url, None) is None: +                output_error("Webhook does not exists") + +            del data[args.url] +            file_content_overwrite(WEBHOOKS_FILE, data) + +        sync_to_peers() + + +class NodeWebhookTestCmd(Cmd): +    name = "node-webhook-test" + +    def args(self, parser): +        parser.add_argument("url") +        parser.add_argument("bearer_token") + +    def run(self, args): +        http_headers = {} +        if args.bearer_token != ".": +            http_headers["Authorization"] = "Bearer " + args.bearer_token + +        try: +            resp = requests.post(args.url, headers=http_headers) +        except requests.ConnectionError as e: +            node_output_notok("{0}".format(e)) + +        if resp.status_code != 200: +            node_output_notok("{0}".format(resp.status_code)) + +        node_output_ok() + + +class WebhookTestCmd(Cmd): +    name = "webhook-test" + +    def args(self, parser): +        parser.add_argument("url", help="URL of Webhook") +        parser.add_argument("--bearer_token", "-t", help="Bearer Token") + +    def run(self, args): +        url = args.url +        bearer_token = args.bearer_token +        if not args.url: +            url = "." +        if not args.bearer_token: +            bearer_token = "." + +        out = execute_in_peers("node-webhook-test", [url, bearer_token]) + +        table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"]) +        table.align["NODE STATUS"] = "r" +        table.align["WEBHOOK STATUS"] = "r" + +        for p in out: +            table.add_row([p.hostname, +                           "UP" if p.node_up else "DOWN", +                           "OK" if p.ok else "NOT OK: {0}".format( +                               p.error)]) + +        print (table) + + +class ConfigGetCmd(Cmd): +    name = "config-get" + +    def args(self, parser): +        parser.add_argument("--name", help="Config Name") + +    def run(self, args): +        data = json.load(open(DEFAULT_CONFIG_FILE)) +        if os.path.exists(CUSTOM_CONFIG_FILE): +            data.update(json.load(open(CUSTOM_CONFIG_FILE))) + +        if args.name is not None and args.name not in CONFIG_KEYS: +            output_error("Invalid Config item") + +        table = PrettyTable(["NAME", "VALUE"]) +        if args.name is None: +            for k, v in data.items(): +                table.add_row([k, v]) +        else: +            table.add_row([args.name, data[args.name]]) + +        print (table) + + +def read_file_content_json(fname): +    content = "{}" +    with open(fname) as f: +        content = f.read() +        if content.strip() == "": +            content = "{}" + +    return json.loads(content) + + +class ConfigSetCmd(Cmd): +    name = "config-set" + +    def args(self, parser): +        parser.add_argument("name", help="Config Name") +        parser.add_argument("value", help="Config Value") + +    def run(self, args): +        if args.name not in CONFIG_KEYS: +            output_error("Invalid Config item") + +        with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE): +            data = json.load(open(DEFAULT_CONFIG_FILE)) +            if os.path.exists(CUSTOM_CONFIG_FILE): +                config_json = read_file_content_json(CUSTOM_CONFIG_FILE) +                data.update(config_json) + +            # Do Nothing if same as previous value +            if data[args.name] == args.value: +                return + +            # TODO: Validate Value +            create_custom_config_file_if_not_exists() +            new_data = read_file_content_json(CUSTOM_CONFIG_FILE) + +            v = args.value +            if args.name in BOOL_CONFIGS: +                v = boolify(args.value) + +            new_data[args.name] = v +            file_content_overwrite(CUSTOM_CONFIG_FILE, new_data) + +            # If any value changed which requires restart of REST server +            restart = False +            if args.name in RESTART_CONFIGS: +                restart = True + +            sync_to_peers(restart=restart) + + +class ConfigResetCmd(Cmd): +    name = "config-reset" + +    def args(self, parser): +        parser.add_argument("name", help="Config Name or all") + +    def run(self, args): +        with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE): +            changed_keys = [] +            data = {} +            if os.path.exists(CUSTOM_CONFIG_FILE): +                data = read_file_content_json(CUSTOM_CONFIG_FILE) + +            if not data: +                return + +            if args.name.lower() == "all": +                for k, v in data.items(): +                    changed_keys.append(k) + +                # Reset all keys +                file_content_overwrite(CUSTOM_CONFIG_FILE, {}) +            else: +                changed_keys.append(args.name) +                del data[args.name] +                file_content_overwrite(CUSTOM_CONFIG_FILE, data) + +            # If any value changed which requires restart of REST server +            restart = False +            for key in changed_keys: +                if key in RESTART_CONFIGS: +                    restart = True +                    break + +            sync_to_peers(restart=restart) + + +class SyncCmd(Cmd): +    name = "sync" + +    def run(self, args): +        sync_to_peers() + + +if __name__ == "__main__": +    runcli() diff --git a/events/src/utils.py b/events/src/utils.py new file mode 100644 index 00000000000..772221a1e25 --- /dev/null +++ b/events/src/utils.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +# +#  Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +#  This file is part of GlusterFS. +# +#  This file is licensed to you under your choice of the GNU Lesser +#  General Public License, version 3 or any later version (LGPLv3 or +#  later), or the GNU General Public License, version 2 (GPLv2), in all +#  cases as published by the Free Software Foundation. +# + +import json +import os +import logging + +import requests +from eventsapiconf import (LOG_FILE, +                           WEBHOOKS_FILE, +                           DEFAULT_CONFIG_FILE, +                           CUSTOM_CONFIG_FILE) +import eventtypes + +from gluster.cliutils import get_node_uuid + + +# Webhooks list +_webhooks = {} +# Default Log Level +_log_level = "INFO" +# Config Object +_config = {} + +# Init Logger instance +logger = logging.getLogger(__name__) + + +def get_event_type_name(idx): +    """ +    Returns Event Type text from the index. For example, VOLUME_CREATE +    """ +    return eventtypes.all_events[idx].replace("EVENT_", "") + + +def setup_logger(): +    """ +    Logging initialization, Log level by default will be INFO, once config +    file is read, respective log_level will be set. +    """ +    global logger +    logger.setLevel(logging.INFO) + +    # create the logging file handler +    fh = logging.FileHandler(LOG_FILE) + +    formatter = logging.Formatter("[%(asctime)s] %(levelname)s " +                                  "[%(module)s - %(lineno)s:%(funcName)s] " +                                  "- %(message)s") + +    fh.setFormatter(formatter) + +    # add handler to logger object +    logger.addHandler(fh) + + +def load_config(): +    """ +    Load/Reload the config from REST Config files. This function will +    be triggered during init and when SIGUSR2. +    """ +    global _config +    _config = {} +    if os.path.exists(DEFAULT_CONFIG_FILE): +        _config = json.load(open(DEFAULT_CONFIG_FILE)) +    if os.path.exists(CUSTOM_CONFIG_FILE): +        _config.update(json.load(open(CUSTOM_CONFIG_FILE))) + + +def load_log_level(): +    """ +    Reads log_level from Config file and sets accordingly. This function will +    be triggered during init and when SIGUSR2. +    """ +    global logger, _log_level +    new_log_level = _config.get("log_level", "INFO") +    if _log_level != new_log_level: +        logger.setLevel(getattr(logging, new_log_level.upper())) +        _log_level = new_log_level.upper() + + +def load_webhooks(): +    """ +    Load/Reload the webhooks list. This function will +    be triggered during init and when SIGUSR2. +    """ +    global _webhooks +    _webhooks = {} +    if os.path.exists(WEBHOOKS_FILE): +        _webhooks = json.load(open(WEBHOOKS_FILE)) + + +def load_all(): +    """ +    Wrapper function to call all load/reload functions. This function will +    be triggered during init and when SIGUSR2. +    """ +    load_config() +    load_webhooks() +    load_log_level() + + +def publish(ts, event_key, data): +    message = { +        "nodeid": get_node_uuid(), +        "ts": int(ts), +        "event": get_event_type_name(event_key), +        "message": data +    } +    if _webhooks: +        plugin_webhook(message) +    else: +        # TODO: Default action? +        pass + + +def plugin_webhook(message): +    message_json = json.dumps(message, sort_keys=True) +    logger.debug("EVENT: {0}".format(message_json)) +    for url, token in _webhooks.items(): +        http_headers = {"Content-Type": "application/json"} +        if token != "" and token is not None: +            http_headers["Authorization"] = "Bearer " + token + +        try: +            resp = requests.post(url, headers=http_headers, data=message_json) +        except requests.ConnectionError as e: +            logger.warn("Event push failed to URL: {url}, " +                        "Event: {event}, " +                        "Status: {error}".format( +                            url=url, +                            event=message_json, +                            error=e)) +            continue + +        if resp.status_code != 200: +            logger.warn("Event push failed to URL: {url}, " +                        "Event: {event}, " +                        "Status Code: {status_code}".format( +                            url=url, +                            event=message_json, +                            status_code=resp.status_code))  | 
