diff options
| author | Aravinda VK <avishwan@redhat.com> | 2016-11-29 16:32:54 +0530 | 
|---|---|---|
| committer | Aravinda VK <avishwan@redhat.com> | 2016-12-02 06:52:55 -0800 | 
| commit | 99458974b7d12bd12d78d4b9a19adfb62d771b5c (patch) | |
| tree | e1fe73d5ec323549b7ebace3b62d942099704047 /events | |
| parent | 4536f7bdf16f8286d67598eda9a46c029f0c0bf4 (diff) | |
eventsapi: Push Messages to Webhooks in parallel
With this patch, glustereventsd will maintain one thread per
webhook. If a webhook is slow, then all the events to that worker
will be delayed but it will not affect the other webhooks.
Note: Webhook in transit may get missed if glustereventsd reloads due to
new Webhook addition or due configuration changes.
BUG: 1357754
Change-Id: I2d11e01c7ac434355bc356ff75396252f51b339b
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/15966
Smoke: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Prashanth Pai <ppai@redhat.com>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Diffstat (limited to 'events')
| -rw-r--r-- | events/src/glustereventsd.py | 2 | ||||
| -rw-r--r-- | events/src/utils.py | 70 | 
2 files changed, 67 insertions, 5 deletions
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 86e64b01ad5..4b56eee9131 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -80,11 +80,13 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):  def signal_handler_sigusr2(sig, frame):      utils.load_all() +    utils.restart_webhook_pool()  def init_event_server():      utils.setup_logger()      utils.load_all() +    utils.init_webhook_pool()      port = utils.get_config("port")      if port is None: diff --git a/events/src/utils.py b/events/src/utils.py index e69d6577ff0..256cfca0fc2 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -14,6 +14,9 @@ import os  import logging  import fcntl  from errno import ESRCH, EBADF +from threading import Thread +import multiprocessing +from Queue import Queue  from eventsapiconf import (LOG_FILE,                             WEBHOOKS_FILE, @@ -34,6 +37,7 @@ _config = {}  # Init Logger instance  logger = logging.getLogger(__name__)  NodeID = None +webhooks_pool = None  def boolify(value): @@ -170,14 +174,13 @@ def autoload_webhooks():              load_webhooks() -def plugin_webhook(message): +def publish_to_webhook(url, token, message_queue):      # Import requests here since not used in any other place      import requests -    message_json = json.dumps(message, sort_keys=True) -    logger.debug("EVENT: {0}".format(message_json)) -    for url, token in _webhooks.items(): -        http_headers = {"Content-Type": "application/json"} +    http_headers = {"Content-Type": "application/json"} +    while True: +        message_json = message_queue.get()          if token != "" and token is not None:              http_headers["Authorization"] = "Bearer " + token @@ -191,6 +194,8 @@ def plugin_webhook(message):                              event=message_json,                              error=e))              continue +        finally: +            message_queue.task_done()          if resp.status_code != 200:              logger.warn("Event push failed to URL: {url}, " @@ -201,6 +206,12 @@ def plugin_webhook(message):                              status_code=resp.status_code)) +def plugin_webhook(message): +    message_json = json.dumps(message, sort_keys=True) +    logger.debug("EVENT: {0}".format(message_json)) +    webhooks_pool.send(message_json) + +  class LockedOpen(object):      def __init__(self, filename, *args, **kwargs): @@ -274,3 +285,52 @@ class PidFile(object):      def __exit__(self, _exc_type, _exc_value, _traceback):          self.cleanup() + + +def webhook_monitor(proc_queue, webhooks): +    queues = {} +    for url, token in webhooks.items(): +        queues[url] = Queue() +        t = Thread(target=publish_to_webhook, args=(url, token, queues[url])) +        t.start() + +    # Get the message sent to Process queue and distribute to all thread queues +    while True: +        message = proc_queue.get() +        for _, q in queues.items(): +            q.put(message) + + +class WebhookThreadPool(object): +    def start(self): +        # Seperate process to emit messages to webhooks +        # which maintains one thread per webhook. Seperate +        # process is required since on reload we need to stop +        # and start the thread pool. In Python Threads can't be stopped +        # so terminate the process and start again. Note: In transit +        # events will be missed during reload +        self.queue = multiprocessing.Queue() +        self.proc = multiprocessing.Process(target=webhook_monitor, +                                            args=(self.queue, _webhooks)) +        self.proc.start() + +    def restart(self): +        # In transit messages are skipped, since webhooks monitor process +        # is terminated. +        self.proc.terminate() +        self.start() + +    def send(self, message): +        self.queue.put(message) + + +def init_webhook_pool(): +    global webhooks_pool +    webhooks_pool = WebhookThreadPool() +    webhooks_pool.start() + + +def restart_webhook_pool(): +    global webhooks_pool +    if webhooks_pool is not None: +        webhooks_pool.restart()  | 
