diff options
Diffstat (limited to 'events/src/utils.py')
-rw-r--r-- | events/src/utils.py | 45 |
1 files changed, 36 insertions, 9 deletions
diff --git a/events/src/utils.py b/events/src/utils.py index 2a77b13d502..5130720d529 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -13,10 +13,11 @@ import json import os import logging import fcntl -from errno import ESRCH, EBADF +from errno import EBADF from threading import Thread import multiprocessing from Queue import Queue +from datetime import datetime, timedelta from eventsapiconf import (LOG_FILE, WEBHOOKS_FILE, @@ -183,15 +184,33 @@ def autoload_webhooks(): load_webhooks() -def publish_to_webhook(url, token, message_queue): +def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60): + import jwt + payload = { + "exp": datetime.utcnow() + timedelta(seconds=jwt_expiry_time_seconds), + "iss": "gluster", + "sub": event_type, + "iat": event_ts + } + return jwt.encode(payload, secret, algorithm='HS256') + + +def publish_to_webhook(url, token, secret, message_queue): # Import requests here since not used in any other place import requests http_headers = {"Content-Type": "application/json"} while True: - message_json = message_queue.get() + hashval = "" + event_type, event_ts, message_json = message_queue.get() if token != "" and token is not None: - http_headers["Authorization"] = "Bearer " + token + hashval = token + + if secret != "" and secret is not None: + hashval = get_jwt_token(secret, event_type, event_ts) + + if hashval: + http_headers["Authorization"] = "Bearer " + hashval try: resp = requests.post(url, headers=http_headers, data=message_json) @@ -218,7 +237,7 @@ def publish_to_webhook(url, token, message_queue): def plugin_webhook(message): message_json = json.dumps(message, sort_keys=True) logger.debug("EVENT: {0}".format(message_json)) - webhooks_pool.send(message_json) + webhooks_pool.send(message["event"], message["ts"], message_json) class LockedOpen(object): @@ -298,9 +317,17 @@ class PidFile(object): def webhook_monitor(proc_queue, webhooks): queues = {} - for url, token in webhooks.items(): + for url, data in webhooks.items(): + if isinstance(data, str) or isinstance(data, unicode): + token = data + secret = None + else: + token = data["token"] + secret = data["secret"] + queues[url] = Queue() - t = Thread(target=publish_to_webhook, args=(url, token, queues[url])) + t = Thread(target=publish_to_webhook, args=(url, token, secret, + queues[url])) t.start() # Get the message sent to Process queue and distribute to all thread queues @@ -329,8 +356,8 @@ class WebhookThreadPool(object): self.proc.terminate() self.start() - def send(self, message): - self.queue.put(message) + def send(self, event_type, event_ts, message): + self.queue.put((event_type, event_ts, message)) def init_webhook_pool(): |