summaryrefslogtreecommitdiffstats
path: root/events/src
diff options
context:
space:
mode:
Diffstat (limited to 'events/src')
-rw-r--r--events/src/Makefile.am8
-rw-r--r--events/src/eventsapiconf.py.in27
-rw-r--r--events/src/gf_event.py19
-rw-r--r--events/src/glustereventsd.py51
-rw-r--r--events/src/peer_eventsapi.py133
-rw-r--r--events/src/utils.py178
6 files changed, 317 insertions, 99 deletions
diff --git a/events/src/Makefile.am b/events/src/Makefile.am
index 8493abdcd82..3b229691897 100644
--- a/events/src/Makefile.am
+++ b/events/src/Makefile.am
@@ -5,9 +5,13 @@ EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in \
BUILT_SOURCES = eventtypes.py
CLEANFILES = eventtypes.py
-eventsdir = $(GLUSTERFS_LIBEXECDIR)/events
+eventsdir = $(GLUSTERFS_LIBEXECDIR)/gfevents
+if BUILD_EVENTS
events_PYTHON = __init__.py gf_event.py eventsapiconf.py eventtypes.py \
utils.py
+endif
+# this does not work, see the Makefile.am in the root for a workaround
+#nodist_events_PYTHON = eventtypes.py
eventtypes.py: $(top_srcdir)/events/eventskeygen.py
$(PYTHON) $(top_srcdir)/events/eventskeygen.py PY_HEADER
@@ -24,7 +28,7 @@ eventspeerscript_SCRIPTS = peer_eventsapi.py
install-exec-hook:
$(mkdir_p) $(DESTDIR)$(sbindir)
rm -f $(DESTDIR)$(sbindir)/glustereventsd
- ln -s $(GLUSTERFS_LIBEXECDIR)/events/glustereventsd.py \
+ ln -s $(GLUSTERFS_LIBEXECDIR)/gfevents/glustereventsd.py \
$(DESTDIR)$(sbindir)/glustereventsd
rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi
ln -s $(GLUSTERFS_LIBEXECDIR)/peer_eventsapi.py \
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in
index 08a3602f567..700093bee60 100644
--- a/events/src/eventsapiconf.py.in
+++ b/events/src/eventsapiconf.py.in
@@ -9,12 +9,32 @@
# cases as published by the Free Software Foundation.
#
+import subprocess
+glusterd_workdir = None
+
+# Methods
+def get_glusterd_workdir():
+ global glusterd_workdir
+ if glusterd_workdir is not None:
+ return glusterd_workdir
+ proc = subprocess.Popen(["gluster", "system::", "getwd"],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines = True)
+ out, err = proc.communicate()
+ if proc.returncode == 0:
+ glusterd_workdir = out.strip()
+ else:
+ glusterd_workdir = "@GLUSTERD_WORKDIR@"
+ return glusterd_workdir
+
SERVER_ADDRESS = "0.0.0.0"
+SERVER_ADDRESSv4 = "0.0.0.0"
+SERVER_ADDRESSv6 = "::1"
DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
-CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC
+CUSTOM_CONFIG_FILE = get_glusterd_workdir() + CUSTOM_CONFIG_FILE_TO_SYNC
WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"
-WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC
+WEBHOOKS_FILE = get_glusterd_workdir() + WEBHOOKS_FILE_TO_SYNC
LOG_FILE = "@localstatedir@/log/glusterfs/events.log"
EVENTSD = "glustereventsd"
CONFIG_KEYS = ["log-level", "port", "disable-events-log"]
@@ -22,10 +42,11 @@ BOOL_CONFIGS = ["disable-events-log"]
INT_CONFIGS = ["port"]
RESTART_CONFIGS = ["port"]
EVENTS_ENABLED = @EVENTS_ENABLED@
-UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info"
+UUID_FILE = get_glusterd_workdir() + "/glusterd.info"
PID_FILE = "@localstatedir@/run/glustereventsd.pid"
AUTO_BOOL_ATTRIBUTES = ["force", "push-pem", "no-verify"]
AUTO_INT_ATTRIBUTES = ["ssh-port"]
+CERTS_DIR = get_glusterd_workdir() + "/events"
# Errors
ERROR_SAME_CONFIG = 2
diff --git a/events/src/gf_event.py b/events/src/gf_event.py
index f9ece6adc28..260b0d9aa48 100644
--- a/events/src/gf_event.py
+++ b/events/src/gf_event.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -13,10 +12,10 @@
import socket
import time
-from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
-from eventtypes import all_events
+from gfevents.eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
+from gfevents.eventtypes import all_events
-from utils import logger, setup_logger, get_config
+from gfevents.utils import logger, setup_logger, get_config
# Run this when this lib loads
setup_logger()
@@ -36,18 +35,18 @@ def gf_event(event_type, **kwargs):
logger.error("Unable to connect to events Server: {0}".format(e))
return
+ port = get_config("port")
+ if port is None:
+ logger.error("Unable to get eventsd port details")
+ return
+
# Convert key value args into KEY1=VALUE1;KEY2=VALUE2;..
msg = ""
for k, v in kwargs.items():
msg += "{0}={1};".format(k, v)
# <TIMESTAMP> <EVENT_TYPE> <MSG>
- msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";"))
-
- port = get_config("port")
- if port is None:
- logger.error("Unable to get eventsd port details")
- return
+ msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")).encode()
try:
sent = client.sendto(msg, (SERVER_ADDRESS, port))
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index 606b89cbd7f..341a3b60947 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -13,22 +13,36 @@
from __future__ import print_function
import sys
import signal
-import SocketServer
+import threading
+try:
+ import socketserver
+except ImportError:
+ import SocketServer as socketserver
import socket
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from eventtypes import all_events
import handlers
import utils
-from eventsapiconf import SERVER_ADDRESS, PID_FILE
+from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE
from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES
from utils import logger, PidFile, PidFileLockFailed, boolify
+# Subclass so that specifically IPv4 packets are captured
+class UDPServerv4(socketserver.ThreadingUDPServer):
+ address_family = socket.AF_INET
-class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
+# Subclass so that specifically IPv6 packets are captured
+class UDPServerv6(socketserver.ThreadingUDPServer):
+ address_family = socket.AF_INET6
+
+class GlusterEventsRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
+ if sys.version_info >= (3,):
+ data = self.request[0].strip().decode("utf-8")
+
logger.debug("EVENT: {0} from {1}".format(repr(data),
self.client_address[0]))
try:
@@ -46,7 +60,7 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
logger.warn("Unable to parse Event {0}".format(data))
return
- for k, v in data_dict.iteritems():
+ for k, v in data_dict.items():
try:
if k in AUTO_BOOL_ATTRIBUTES:
data_dict[k] = boolify(v)
@@ -83,6 +97,10 @@ def signal_handler_sigusr2(sig, frame):
utils.restart_webhook_pool()
+def UDP_server_thread(sock):
+ sock.serve_forever()
+
+
def init_event_server():
utils.setup_logger()
utils.load_all()
@@ -93,15 +111,26 @@ def init_event_server():
sys.stderr.write("Unable to get Port details from Config\n")
sys.exit(1)
- # Start the Eventing Server, UDP Server
+ # Creating the Eventing Server, UDP Server for IPv4 packets
+ try:
+ serverv4 = UDPServerv4((SERVER_ADDRESSv4, port),
+ GlusterEventsRequestHandler)
+ except socket.error as e:
+ sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e))
+ sys.exit(1)
+ # Creating the Eventing Server, UDP Server for IPv6 packets
try:
- server = SocketServer.ThreadingUDPServer(
- (SERVER_ADDRESS, port),
- GlusterEventsRequestHandler)
+ serverv6 = UDPServerv6((SERVER_ADDRESSv6, port),
+ GlusterEventsRequestHandler)
except socket.error as e:
- sys.stderr.write("Failed to start Eventsd: {0}\n".format(e))
+ sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e))
sys.exit(1)
- server.serve_forever()
+ server_thread1 = threading.Thread(target=UDP_server_thread,
+ args=(serverv4,))
+ server_thread2 = threading.Thread(target=UDP_server_thread,
+ args=(serverv6,))
+ server_thread1.start()
+ server_thread2.start()
def get_args():
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py
index 59808ada539..4d2e5f35b1c 100644
--- a/events/src/peer_eventsapi.py
+++ b/events/src/peer_eventsapi.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -18,6 +18,7 @@ import fcntl
from errno import EACCES, EAGAIN
import signal
import sys
+import time
import requests
from prettytable import PrettyTable
@@ -26,27 +27,28 @@ from gluster.cliutils import (Cmd, node_output_ok, node_output_notok,
sync_file_to_peers, GlusterCmdException,
output_error, execute_in_peers, runcli,
set_common_args_func)
-from events.utils import LockedOpen
-
-from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE,
- CUSTOM_CONFIG_FILE_TO_SYNC,
- EVENTSD,
- CONFIG_KEYS,
- BOOL_CONFIGS,
- INT_CONFIGS,
- PID_FILE,
- RESTART_CONFIGS,
- ERROR_INVALID_CONFIG,
- ERROR_WEBHOOK_NOT_EXISTS,
- ERROR_CONFIG_SYNC_FAILED,
- ERROR_WEBHOOK_ALREADY_EXISTS,
- ERROR_PARTIAL_SUCCESS,
- ERROR_ALL_NODES_STATUS_NOT_OK,
- ERROR_SAME_CONFIG,
- ERROR_WEBHOOK_SYNC_FAILED)
+from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert
+
+from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE_TO_SYNC,
+ EVENTSD,
+ CONFIG_KEYS,
+ BOOL_CONFIGS,
+ INT_CONFIGS,
+ PID_FILE,
+ RESTART_CONFIGS,
+ ERROR_INVALID_CONFIG,
+ ERROR_WEBHOOK_NOT_EXISTS,
+ ERROR_CONFIG_SYNC_FAILED,
+ ERROR_WEBHOOK_ALREADY_EXISTS,
+ ERROR_PARTIAL_SUCCESS,
+ ERROR_ALL_NODES_STATUS_NOT_OK,
+ ERROR_SAME_CONFIG,
+ ERROR_WEBHOOK_SYNC_FAILED,
+ CERTS_DIR)
def handle_output_error(err, errcode=1, json_output=False):
@@ -171,8 +173,10 @@ def sync_to_peers(args):
try:
sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC)
except GlusterCmdException as e:
+ # Print stdout if stderr is empty
+ errmsg = e.message[2] if e.message[2] else e.message[1]
handle_output_error("Failed to sync Webhooks file: [Error: {0}]"
- "{1}".format(e[0], e[2]),
+ "{1}".format(e.message[0], errmsg),
errcode=ERROR_WEBHOOK_SYNC_FAILED,
json_output=args.json)
@@ -180,8 +184,10 @@ def sync_to_peers(args):
try:
sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC)
except GlusterCmdException as e:
+ # Print stdout if stderr is empty
+ errmsg = e.message[2] if e.message[2] else e.message[1]
handle_output_error("Failed to sync Config file: [Error: {0}]"
- "{1}".format(e[0], e[2]),
+ "{1}".format(e.message[0], errmsg),
errcode=ERROR_CONFIG_SYNC_FAILED,
json_output=args.json)
@@ -307,6 +313,8 @@ class WebhookAddCmd(Cmd):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token",
default="")
+ parser.add_argument("--secret", "-s",
+ help="Secret to add JWT Bearer Token", default="")
def run(self, args):
create_webhooks_file_if_not_exists(args)
@@ -318,7 +326,8 @@ class WebhookAddCmd(Cmd):
errcode=ERROR_WEBHOOK_ALREADY_EXISTS,
json_output=args.json)
- data[args.url] = args.bearer_token
+ data[args.url] = {"token": args.bearer_token,
+ "secret": args.secret}
file_content_overwrite(WEBHOOKS_FILE, data)
sync_to_peers(args)
@@ -331,6 +340,8 @@ class WebhookModCmd(Cmd):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token",
default="")
+ parser.add_argument("--secret", "-s",
+ help="Secret to add JWT Bearer Token", default="")
def run(self, args):
create_webhooks_file_if_not_exists(args)
@@ -342,7 +353,15 @@ class WebhookModCmd(Cmd):
errcode=ERROR_WEBHOOK_NOT_EXISTS,
json_output=args.json)
- data[args.url] = args.bearer_token
+ if isinstance(data[args.url], str):
+ data[args.url]["token"] = data[args.url]
+
+ if args.bearer_token != "":
+ data[args.url]["token"] = args.bearer_token
+
+ if args.secret != "":
+ data[args.url]["secret"] = args.secret
+
file_content_overwrite(WEBHOOKS_FILE, data)
sync_to_peers(args)
@@ -376,18 +395,57 @@ class NodeWebhookTestCmd(Cmd):
def args(self, parser):
parser.add_argument("url")
parser.add_argument("bearer_token")
+ parser.add_argument("secret")
def run(self, args):
http_headers = {}
+ hashval = ""
if args.bearer_token != ".":
- http_headers["Authorization"] = "Bearer " + args.bearer_token
+ hashval = args.bearer_token
- try:
- resp = requests.post(args.url, headers=http_headers)
- except requests.ConnectionError as e:
- node_output_notok("{0}".format(e))
- except requests.exceptions.InvalidSchema as e:
- node_output_notok("{0}".format(e))
+ if args.secret != ".":
+ hashval = get_jwt_token(args.secret, "TEST", int(time.time()))
+
+ if hashval:
+ http_headers["Authorization"] = "Bearer " + hashval
+
+ urldata = requests.utils.urlparse(args.url)
+ parts = urldata.netloc.split(":")
+ domain = parts[0]
+ # Default https port if not specified
+ port = 443
+ if len(parts) == 2:
+ port = int(parts[1])
+
+ cert_path = os.path.join(CERTS_DIR, args.url.replace("/", "_").strip())
+ verify = True
+ while True:
+ try:
+ resp = requests.post(args.url, headers=http_headers,
+ verify=verify)
+ # Successful webhook push
+ break
+ except requests.exceptions.SSLError as e:
+ # If verify is equal to cert path, but still failed with
+ # SSLError, Looks like some issue with custom downloaded
+ # certificate, Try with verify = false
+ if verify == cert_path:
+ verify = False
+ continue
+
+ # If verify is instance of bool and True, then custom cert
+ # is required, download the cert and retry
+ try:
+ save_https_cert(domain, port, cert_path)
+ verify = cert_path
+ except Exception:
+ verify = False
+
+ # Done with collecting cert, continue
+ continue
+ except Exception as e:
+ node_output_notok("{0}".format(e))
+ break
if resp.status_code != 200:
node_output_notok("{0}".format(resp.status_code))
@@ -401,16 +459,23 @@ class WebhookTestCmd(Cmd):
def args(self, parser):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token")
+ parser.add_argument("--secret", "-s",
+ help="Secret to generate Bearer Token")
def run(self, args):
url = args.url
bearer_token = args.bearer_token
+ secret = args.secret
+
if not args.url:
url = "."
if not args.bearer_token:
bearer_token = "."
+ if not args.secret:
+ secret = "."
- out = execute_in_peers("node-webhook-test", [url, bearer_token])
+ out = execute_in_peers("node-webhook-test", [url, bearer_token,
+ secret])
if not args.json:
table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"])
diff --git a/events/src/utils.py b/events/src/utils.py
index 2a77b13d502..6d4e0791a2b 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -9,21 +9,34 @@
# cases as published by the Free Software Foundation.
#
+import sys
import json
import os
import logging
+import logging.handlers
import fcntl
-from errno import ESRCH, EBADF
+from errno import EBADF
from threading import Thread
import multiprocessing
-from Queue import Queue
-
-from eventsapiconf import (LOG_FILE,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE,
- UUID_FILE)
-import eventtypes
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+from datetime import datetime, timedelta
+import base64
+import hmac
+from hashlib import sha256
+from calendar import timegm
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from gfevents.eventsapiconf import (LOG_FILE,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ UUID_FILE,
+ CERTS_DIR)
+from gfevents import eventtypes
# Webhooks list
@@ -86,7 +99,7 @@ def setup_logger():
logger.setLevel(logging.INFO)
# create the logging file handler
- fh = logging.FileHandler(LOG_FILE)
+ fh = logging.handlers.WatchedFileHandler(LOG_FILE)
formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
"[%(module)s - %(lineno)s:%(funcName)s] "
@@ -183,42 +196,121 @@ def autoload_webhooks():
load_webhooks()
-def publish_to_webhook(url, token, message_queue):
+def base64_urlencode(inp):
+ return base64.urlsafe_b64encode(inp).replace("=", "").strip()
+
+
+def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60):
+ exp = datetime.utcnow() + timedelta(seconds=jwt_expiry_time_seconds)
+ payload = {
+ "exp": timegm(exp.utctimetuple()),
+ "iss": "gluster",
+ "sub": event_type,
+ "iat": event_ts
+ }
+ header = '{"alg":"HS256","typ":"JWT"}'
+ payload = json.dumps(payload, separators=(',', ':'), sort_keys=True)
+ msg = base64_urlencode(header) + "." + base64_urlencode(payload)
+ return "%s.%s" % (
+ msg,
+ base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest())
+ )
+
+
+def save_https_cert(domain, port, cert_path):
+ import ssl
+
+ # Cert file already available for this URL
+ if os.path.exists(cert_path):
+ return
+
+ cert_data = ssl.get_server_certificate((domain, port))
+ with open(cert_path, "w") as f:
+ f.write(cert_data)
+
+
+def publish_to_webhook(url, token, secret, message_queue):
# Import requests here since not used in any other place
import requests
http_headers = {"Content-Type": "application/json"}
+ urldata = requests.utils.urlparse(url)
+ parts = urldata.netloc.split(":")
+ domain = parts[0]
+ # Default https port if not specified
+ port = 443
+ if len(parts) == 2:
+ port = int(parts[1])
+
+ cert_path = os.path.join(CERTS_DIR, url.replace("/", "_").strip())
+
while True:
- message_json = message_queue.get()
+ hashval = ""
+ event_type, event_ts, message_json = message_queue.get()
if token != "" and token is not None:
- http_headers["Authorization"] = "Bearer " + token
+ hashval = token
- try:
- resp = requests.post(url, headers=http_headers, data=message_json)
- except requests.ConnectionError as e:
- logger.warn("Event push failed to URL: {url}, "
- "Event: {event}, "
- "Status: {error}".format(
- url=url,
- event=message_json,
- error=e))
- continue
- finally:
- message_queue.task_done()
+ if secret != "" and secret is not None:
+ hashval = get_jwt_token(secret, event_type, event_ts)
- if resp.status_code != 200:
- logger.warn("Event push failed to URL: {url}, "
- "Event: {event}, "
- "Status Code: {status_code}".format(
- url=url,
- event=message_json,
- status_code=resp.status_code))
+ if hashval:
+ http_headers["Authorization"] = "Bearer " + hashval
+
+ verify = True
+ while True:
+ try:
+ resp = requests.post(url, headers=http_headers,
+ data=message_json,
+ verify=verify)
+ # Successful webhook push
+ message_queue.task_done()
+ if resp.status_code != 200:
+ logger.warn("Event push failed to URL: {url}, "
+ "Event: {event}, "
+ "Status Code: {status_code}".format(
+ url=url,
+ event=message_json,
+ status_code=resp.status_code))
+ break
+ except requests.exceptions.SSLError as e:
+ # If verify is equal to cert path, but still failed with
+ # SSLError, Looks like some issue with custom downloaded
+ # certificate, Try with verify = false
+ if verify == cert_path:
+ logger.warn("Event push failed with certificate, "
+ "ignoring verification url={0} "
+ "Error={1}".format(url, e))
+ verify = False
+ continue
+
+ # If verify is instance of bool and True, then custom cert
+ # is required, download the cert and retry
+ try:
+ save_https_cert(domain, port, cert_path)
+ verify = cert_path
+ except Exception as ex:
+ verify = False
+ logger.warn("Unable to get Server certificate, "
+ "ignoring verification url={0} "
+ "Error={1}".format(url, ex))
+
+ # Done with collecting cert, continue
+ continue
+ except Exception as e:
+ logger.warn("Event push failed to URL: {url}, "
+ "Event: {event}, "
+ "Status: {error}".format(
+ url=url,
+ event=message_json,
+ error=e))
+ message_queue.task_done()
+ break
def plugin_webhook(message):
message_json = json.dumps(message, sort_keys=True)
logger.debug("EVENT: {0}".format(message_json))
- webhooks_pool.send(message_json)
+ webhooks_pool.send(message["event"], message["ts"], message_json)
class LockedOpen(object):
@@ -298,9 +390,17 @@ class PidFile(object):
def webhook_monitor(proc_queue, webhooks):
queues = {}
- for url, token in webhooks.items():
+ for url, data in webhooks.items():
+ if isinstance(data, str):
+ token = data
+ secret = None
+ else:
+ token = data["token"]
+ secret = data["secret"]
+
queues[url] = Queue()
- t = Thread(target=publish_to_webhook, args=(url, token, queues[url]))
+ t = Thread(target=publish_to_webhook, args=(url, token, secret,
+ queues[url]))
t.start()
# Get the message sent to Process queue and distribute to all thread queues
@@ -312,8 +412,8 @@ def webhook_monitor(proc_queue, webhooks):
class WebhookThreadPool(object):
def start(self):
- # Seperate process to emit messages to webhooks
- # which maintains one thread per webhook. Seperate
+ # Separate process to emit messages to webhooks
+ # which maintains one thread per webhook. Separate
# process is required since on reload we need to stop
# and start the thread pool. In Python Threads can't be stopped
# so terminate the process and start again. Note: In transit
@@ -329,8 +429,8 @@ class WebhookThreadPool(object):
self.proc.terminate()
self.start()
- def send(self, message):
- self.queue.put(message)
+ def send(self, event_type, event_ts, message):
+ self.queue.put((event_type, event_ts, message))
def init_webhook_pool():