summaryrefslogtreecommitdiffstats
path: root/events/src
diff options
context:
space:
mode:
Diffstat (limited to 'events/src')
-rw-r--r--events/src/Makefile.am4
-rw-r--r--events/src/eventsapiconf.py.in5
-rw-r--r--events/src/gf_event.py19
-rw-r--r--events/src/glustereventsd.py46
-rw-r--r--events/src/peer_eventsapi.py57
-rw-r--r--events/src/utils.py26
6 files changed, 96 insertions, 61 deletions
diff --git a/events/src/Makefile.am b/events/src/Makefile.am
index 4e83a469cc2..3b229691897 100644
--- a/events/src/Makefile.am
+++ b/events/src/Makefile.am
@@ -5,7 +5,7 @@ EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in \
BUILT_SOURCES = eventtypes.py
CLEANFILES = eventtypes.py
-eventsdir = $(GLUSTERFS_LIBEXECDIR)/events
+eventsdir = $(GLUSTERFS_LIBEXECDIR)/gfevents
if BUILD_EVENTS
events_PYTHON = __init__.py gf_event.py eventsapiconf.py eventtypes.py \
utils.py
@@ -28,7 +28,7 @@ eventspeerscript_SCRIPTS = peer_eventsapi.py
install-exec-hook:
$(mkdir_p) $(DESTDIR)$(sbindir)
rm -f $(DESTDIR)$(sbindir)/glustereventsd
- ln -s $(GLUSTERFS_LIBEXECDIR)/events/glustereventsd.py \
+ ln -s $(GLUSTERFS_LIBEXECDIR)/gfevents/glustereventsd.py \
$(DESTDIR)$(sbindir)/glustereventsd
rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi
ln -s $(GLUSTERFS_LIBEXECDIR)/peer_eventsapi.py \
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in
index 003cb981572..700093bee60 100644
--- a/events/src/eventsapiconf.py.in
+++ b/events/src/eventsapiconf.py.in
@@ -18,7 +18,8 @@ def get_glusterd_workdir():
if glusterd_workdir is not None:
return glusterd_workdir
proc = subprocess.Popen(["gluster", "system::", "getwd"],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines = True)
out, err = proc.communicate()
if proc.returncode == 0:
glusterd_workdir = out.strip()
@@ -27,6 +28,8 @@ def get_glusterd_workdir():
return glusterd_workdir
SERVER_ADDRESS = "0.0.0.0"
+SERVER_ADDRESSv4 = "0.0.0.0"
+SERVER_ADDRESSv6 = "::1"
DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
CUSTOM_CONFIG_FILE = get_glusterd_workdir() + CUSTOM_CONFIG_FILE_TO_SYNC
diff --git a/events/src/gf_event.py b/events/src/gf_event.py
index fd272434242..260b0d9aa48 100644
--- a/events/src/gf_event.py
+++ b/events/src/gf_event.py
@@ -1,4 +1,3 @@
-#!/usr/bin/python2
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -13,10 +12,10 @@
import socket
import time
-from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
-from eventtypes import all_events
+from gfevents.eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
+from gfevents.eventtypes import all_events
-from utils import logger, setup_logger, get_config
+from gfevents.utils import logger, setup_logger, get_config
# Run this when this lib loads
setup_logger()
@@ -36,18 +35,18 @@ def gf_event(event_type, **kwargs):
logger.error("Unable to connect to events Server: {0}".format(e))
return
+ port = get_config("port")
+ if port is None:
+ logger.error("Unable to get eventsd port details")
+ return
+
# Convert key value args into KEY1=VALUE1;KEY2=VALUE2;..
msg = ""
for k, v in kwargs.items():
msg += "{0}={1};".format(k, v)
# <TIMESTAMP> <EVENT_TYPE> <MSG>
- msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";"))
-
- port = get_config("port")
- if port is None:
- logger.error("Unable to get eventsd port details")
- return
+ msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")).encode()
try:
sent = client.sendto(msg, (SERVER_ADDRESS, port))
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index 014f5a0009d..341a3b60947 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -13,6 +13,7 @@
from __future__ import print_function
import sys
import signal
+import threading
try:
import socketserver
except ImportError:
@@ -23,15 +24,25 @@ from argparse import ArgumentParser, RawDescriptionHelpFormatter
from eventtypes import all_events
import handlers
import utils
-from eventsapiconf import SERVER_ADDRESS, PID_FILE
+from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE
from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES
from utils import logger, PidFile, PidFileLockFailed, boolify
+# Subclass so that specifically IPv4 packets are captured
+class UDPServerv4(socketserver.ThreadingUDPServer):
+ address_family = socket.AF_INET
-class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
+# Subclass so that specifically IPv6 packets are captured
+class UDPServerv6(socketserver.ThreadingUDPServer):
+ address_family = socket.AF_INET6
+
+class GlusterEventsRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
+ if sys.version_info >= (3,):
+ data = self.request[0].strip().decode("utf-8")
+
logger.debug("EVENT: {0} from {1}".format(repr(data),
self.client_address[0]))
try:
@@ -49,7 +60,7 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
logger.warn("Unable to parse Event {0}".format(data))
return
- for k, v in data_dict.iteritems():
+ for k, v in data_dict.items():
try:
if k in AUTO_BOOL_ATTRIBUTES:
data_dict[k] = boolify(v)
@@ -86,6 +97,10 @@ def signal_handler_sigusr2(sig, frame):
utils.restart_webhook_pool()
+def UDP_server_thread(sock):
+ sock.serve_forever()
+
+
def init_event_server():
utils.setup_logger()
utils.load_all()
@@ -96,15 +111,26 @@ def init_event_server():
sys.stderr.write("Unable to get Port details from Config\n")
sys.exit(1)
- # Start the Eventing Server, UDP Server
+ # Creating the Eventing Server, UDP Server for IPv4 packets
+ try:
+ serverv4 = UDPServerv4((SERVER_ADDRESSv4, port),
+ GlusterEventsRequestHandler)
+ except socket.error as e:
+ sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e))
+ sys.exit(1)
+ # Creating the Eventing Server, UDP Server for IPv6 packets
try:
- server = SocketServer.ThreadingUDPServer(
- (SERVER_ADDRESS, port),
- GlusterEventsRequestHandler)
+ serverv6 = UDPServerv6((SERVER_ADDRESSv6, port),
+ GlusterEventsRequestHandler)
except socket.error as e:
- sys.stderr.write("Failed to start Eventsd: {0}\n".format(e))
+ sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e))
sys.exit(1)
- server.serve_forever()
+ server_thread1 = threading.Thread(target=UDP_server_thread,
+ args=(serverv4,))
+ server_thread2 = threading.Thread(target=UDP_server_thread,
+ args=(serverv6,))
+ server_thread1.start()
+ server_thread2.start()
def get_args():
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py
index fb11e12deb4..4d2e5f35b1c 100644
--- a/events/src/peer_eventsapi.py
+++ b/events/src/peer_eventsapi.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -27,28 +27,28 @@ from gluster.cliutils import (Cmd, node_output_ok, node_output_notok,
sync_file_to_peers, GlusterCmdException,
output_error, execute_in_peers, runcli,
set_common_args_func)
-from events.utils import LockedOpen, get_jwt_token, save_https_cert
-
-from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE,
- CUSTOM_CONFIG_FILE_TO_SYNC,
- EVENTSD,
- CONFIG_KEYS,
- BOOL_CONFIGS,
- INT_CONFIGS,
- PID_FILE,
- RESTART_CONFIGS,
- ERROR_INVALID_CONFIG,
- ERROR_WEBHOOK_NOT_EXISTS,
- ERROR_CONFIG_SYNC_FAILED,
- ERROR_WEBHOOK_ALREADY_EXISTS,
- ERROR_PARTIAL_SUCCESS,
- ERROR_ALL_NODES_STATUS_NOT_OK,
- ERROR_SAME_CONFIG,
- ERROR_WEBHOOK_SYNC_FAILED,
- CERTS_DIR)
+from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert
+
+from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE_TO_SYNC,
+ EVENTSD,
+ CONFIG_KEYS,
+ BOOL_CONFIGS,
+ INT_CONFIGS,
+ PID_FILE,
+ RESTART_CONFIGS,
+ ERROR_INVALID_CONFIG,
+ ERROR_WEBHOOK_NOT_EXISTS,
+ ERROR_CONFIG_SYNC_FAILED,
+ ERROR_WEBHOOK_ALREADY_EXISTS,
+ ERROR_PARTIAL_SUCCESS,
+ ERROR_ALL_NODES_STATUS_NOT_OK,
+ ERROR_SAME_CONFIG,
+ ERROR_WEBHOOK_SYNC_FAILED,
+ CERTS_DIR)
def handle_output_error(err, errcode=1, json_output=False):
@@ -173,8 +173,10 @@ def sync_to_peers(args):
try:
sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC)
except GlusterCmdException as e:
+ # Print stdout if stderr is empty
+ errmsg = e.message[2] if e.message[2] else e.message[1]
handle_output_error("Failed to sync Webhooks file: [Error: {0}]"
- "{1}".format(e[0], e[2]),
+ "{1}".format(e.message[0], errmsg),
errcode=ERROR_WEBHOOK_SYNC_FAILED,
json_output=args.json)
@@ -182,8 +184,10 @@ def sync_to_peers(args):
try:
sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC)
except GlusterCmdException as e:
+ # Print stdout if stderr is empty
+ errmsg = e.message[2] if e.message[2] else e.message[1]
handle_output_error("Failed to sync Config file: [Error: {0}]"
- "{1}".format(e[0], e[2]),
+ "{1}".format(e.message[0], errmsg),
errcode=ERROR_CONFIG_SYNC_FAILED,
json_output=args.json)
@@ -349,8 +353,7 @@ class WebhookModCmd(Cmd):
errcode=ERROR_WEBHOOK_NOT_EXISTS,
json_output=args.json)
- if isinstance(data[args.url], str) or \
- isinstance(data[args.url], unicode):
+ if isinstance(data[args.url], str):
data[args.url]["token"] = data[args.url]
if args.bearer_token != "":
diff --git a/events/src/utils.py b/events/src/utils.py
index c1c0bccfe66..6d4e0791a2b 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -9,9 +9,11 @@
# cases as published by the Free Software Foundation.
#
+import sys
import json
import os
import logging
+import logging.handlers
import fcntl
from errno import EBADF
from threading import Thread
@@ -26,13 +28,15 @@ import hmac
from hashlib import sha256
from calendar import timegm
-from eventsapiconf import (LOG_FILE,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE,
- UUID_FILE,
- CERTS_DIR)
-import eventtypes
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from gfevents.eventsapiconf import (LOG_FILE,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ UUID_FILE,
+ CERTS_DIR)
+from gfevents import eventtypes
# Webhooks list
@@ -95,7 +99,7 @@ def setup_logger():
logger.setLevel(logging.INFO)
# create the logging file handler
- fh = logging.FileHandler(LOG_FILE)
+ fh = logging.handlers.WatchedFileHandler(LOG_FILE)
formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
"[%(module)s - %(lineno)s:%(funcName)s] "
@@ -387,7 +391,7 @@ class PidFile(object):
def webhook_monitor(proc_queue, webhooks):
queues = {}
for url, data in webhooks.items():
- if isinstance(data, str) or isinstance(data, unicode):
+ if isinstance(data, str):
token = data
secret = None
else:
@@ -408,8 +412,8 @@ def webhook_monitor(proc_queue, webhooks):
class WebhookThreadPool(object):
def start(self):
- # Seperate process to emit messages to webhooks
- # which maintains one thread per webhook. Seperate
+ # Separate process to emit messages to webhooks
+ # which maintains one thread per webhook. Separate
# process is required since on reload we need to stop
# and start the thread pool. In Python Threads can't be stopped
# so terminate the process and start again. Note: In transit