summaryrefslogtreecommitdiffstats
path: root/events/src
diff options
context:
space:
mode:
Diffstat (limited to 'events/src')
-rw-r--r--events/src/Makefile.am6
-rw-r--r--events/src/eventsapiconf.py.in28
-rw-r--r--events/src/gf_event.py19
-rw-r--r--events/src/glustereventsd.py51
-rw-r--r--events/src/peer_eventsapi.py57
-rw-r--r--events/src/utils.py33
6 files changed, 127 insertions, 67 deletions
diff --git a/events/src/Makefile.am b/events/src/Makefile.am
index 4308ccdbb22..3b229691897 100644
--- a/events/src/Makefile.am
+++ b/events/src/Makefile.am
@@ -5,9 +5,11 @@ EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in \
BUILT_SOURCES = eventtypes.py
CLEANFILES = eventtypes.py
-eventsdir = $(GLUSTERFS_LIBEXECDIR)/events
+eventsdir = $(GLUSTERFS_LIBEXECDIR)/gfevents
+if BUILD_EVENTS
events_PYTHON = __init__.py gf_event.py eventsapiconf.py eventtypes.py \
utils.py
+endif
# this does not work, see the Makefile.am in the root for a workaround
#nodist_events_PYTHON = eventtypes.py
@@ -26,7 +28,7 @@ eventspeerscript_SCRIPTS = peer_eventsapi.py
install-exec-hook:
$(mkdir_p) $(DESTDIR)$(sbindir)
rm -f $(DESTDIR)$(sbindir)/glustereventsd
- ln -s $(GLUSTERFS_LIBEXECDIR)/events/glustereventsd.py \
+ ln -s $(GLUSTERFS_LIBEXECDIR)/gfevents/glustereventsd.py \
$(DESTDIR)$(sbindir)/glustereventsd
rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi
ln -s $(GLUSTERFS_LIBEXECDIR)/peer_eventsapi.py \
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in
index 687eaa39c00..700093bee60 100644
--- a/events/src/eventsapiconf.py.in
+++ b/events/src/eventsapiconf.py.in
@@ -9,12 +9,32 @@
# cases as published by the Free Software Foundation.
#
+import subprocess
+glusterd_workdir = None
+
+# Methods
+def get_glusterd_workdir():
+ global glusterd_workdir
+ if glusterd_workdir is not None:
+ return glusterd_workdir
+ proc = subprocess.Popen(["gluster", "system::", "getwd"],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines = True)
+ out, err = proc.communicate()
+ if proc.returncode == 0:
+ glusterd_workdir = out.strip()
+ else:
+ glusterd_workdir = "@GLUSTERD_WORKDIR@"
+ return glusterd_workdir
+
SERVER_ADDRESS = "0.0.0.0"
+SERVER_ADDRESSv4 = "0.0.0.0"
+SERVER_ADDRESSv6 = "::1"
DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
-CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC
+CUSTOM_CONFIG_FILE = get_glusterd_workdir() + CUSTOM_CONFIG_FILE_TO_SYNC
WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"
-WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC
+WEBHOOKS_FILE = get_glusterd_workdir() + WEBHOOKS_FILE_TO_SYNC
LOG_FILE = "@localstatedir@/log/glusterfs/events.log"
EVENTSD = "glustereventsd"
CONFIG_KEYS = ["log-level", "port", "disable-events-log"]
@@ -22,11 +42,11 @@ BOOL_CONFIGS = ["disable-events-log"]
INT_CONFIGS = ["port"]
RESTART_CONFIGS = ["port"]
EVENTS_ENABLED = @EVENTS_ENABLED@
-UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info"
+UUID_FILE = get_glusterd_workdir() + "/glusterd.info"
PID_FILE = "@localstatedir@/run/glustereventsd.pid"
AUTO_BOOL_ATTRIBUTES = ["force", "push-pem", "no-verify"]
AUTO_INT_ATTRIBUTES = ["ssh-port"]
-CERTS_DIR = "@GLUSTERD_WORKDIR@/events"
+CERTS_DIR = get_glusterd_workdir() + "/events"
# Errors
ERROR_SAME_CONFIG = 2
diff --git a/events/src/gf_event.py b/events/src/gf_event.py
index f9ece6adc28..260b0d9aa48 100644
--- a/events/src/gf_event.py
+++ b/events/src/gf_event.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -13,10 +12,10 @@
import socket
import time
-from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
-from eventtypes import all_events
+from gfevents.eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
+from gfevents.eventtypes import all_events
-from utils import logger, setup_logger, get_config
+from gfevents.utils import logger, setup_logger, get_config
# Run this when this lib loads
setup_logger()
@@ -36,18 +35,18 @@ def gf_event(event_type, **kwargs):
logger.error("Unable to connect to events Server: {0}".format(e))
return
+ port = get_config("port")
+ if port is None:
+ logger.error("Unable to get eventsd port details")
+ return
+
# Convert key value args into KEY1=VALUE1;KEY2=VALUE2;..
msg = ""
for k, v in kwargs.items():
msg += "{0}={1};".format(k, v)
# <TIMESTAMP> <EVENT_TYPE> <MSG>
- msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";"))
-
- port = get_config("port")
- if port is None:
- logger.error("Unable to get eventsd port details")
- return
+ msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")).encode()
try:
sent = client.sendto(msg, (SERVER_ADDRESS, port))
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index 606b89cbd7f..341a3b60947 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -13,22 +13,36 @@
from __future__ import print_function
import sys
import signal
-import SocketServer
+import threading
+try:
+ import socketserver
+except ImportError:
+ import SocketServer as socketserver
import socket
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from eventtypes import all_events
import handlers
import utils
-from eventsapiconf import SERVER_ADDRESS, PID_FILE
+from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE
from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES
from utils import logger, PidFile, PidFileLockFailed, boolify
+# Subclass so that specifically IPv4 packets are captured
+class UDPServerv4(socketserver.ThreadingUDPServer):
+ address_family = socket.AF_INET
-class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
+# Subclass so that specifically IPv6 packets are captured
+class UDPServerv6(socketserver.ThreadingUDPServer):
+ address_family = socket.AF_INET6
+
+class GlusterEventsRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
+ if sys.version_info >= (3,):
+ data = self.request[0].strip().decode("utf-8")
+
logger.debug("EVENT: {0} from {1}".format(repr(data),
self.client_address[0]))
try:
@@ -46,7 +60,7 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
logger.warn("Unable to parse Event {0}".format(data))
return
- for k, v in data_dict.iteritems():
+ for k, v in data_dict.items():
try:
if k in AUTO_BOOL_ATTRIBUTES:
data_dict[k] = boolify(v)
@@ -83,6 +97,10 @@ def signal_handler_sigusr2(sig, frame):
utils.restart_webhook_pool()
+def UDP_server_thread(sock):
+ sock.serve_forever()
+
+
def init_event_server():
utils.setup_logger()
utils.load_all()
@@ -93,15 +111,26 @@ def init_event_server():
sys.stderr.write("Unable to get Port details from Config\n")
sys.exit(1)
- # Start the Eventing Server, UDP Server
+ # Creating the Eventing Server, UDP Server for IPv4 packets
+ try:
+ serverv4 = UDPServerv4((SERVER_ADDRESSv4, port),
+ GlusterEventsRequestHandler)
+ except socket.error as e:
+ sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e))
+ sys.exit(1)
+ # Creating the Eventing Server, UDP Server for IPv6 packets
try:
- server = SocketServer.ThreadingUDPServer(
- (SERVER_ADDRESS, port),
- GlusterEventsRequestHandler)
+ serverv6 = UDPServerv6((SERVER_ADDRESSv6, port),
+ GlusterEventsRequestHandler)
except socket.error as e:
- sys.stderr.write("Failed to start Eventsd: {0}\n".format(e))
+ sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e))
sys.exit(1)
- server.serve_forever()
+ server_thread1 = threading.Thread(target=UDP_server_thread,
+ args=(serverv4,))
+ server_thread2 = threading.Thread(target=UDP_server_thread,
+ args=(serverv6,))
+ server_thread1.start()
+ server_thread2.start()
def get_args():
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py
index d72fdbe99c4..4d2e5f35b1c 100644
--- a/events/src/peer_eventsapi.py
+++ b/events/src/peer_eventsapi.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -27,28 +27,28 @@ from gluster.cliutils import (Cmd, node_output_ok, node_output_notok,
sync_file_to_peers, GlusterCmdException,
output_error, execute_in_peers, runcli,
set_common_args_func)
-from events.utils import LockedOpen, get_jwt_token, save_https_cert
-
-from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE,
- CUSTOM_CONFIG_FILE_TO_SYNC,
- EVENTSD,
- CONFIG_KEYS,
- BOOL_CONFIGS,
- INT_CONFIGS,
- PID_FILE,
- RESTART_CONFIGS,
- ERROR_INVALID_CONFIG,
- ERROR_WEBHOOK_NOT_EXISTS,
- ERROR_CONFIG_SYNC_FAILED,
- ERROR_WEBHOOK_ALREADY_EXISTS,
- ERROR_PARTIAL_SUCCESS,
- ERROR_ALL_NODES_STATUS_NOT_OK,
- ERROR_SAME_CONFIG,
- ERROR_WEBHOOK_SYNC_FAILED,
- CERTS_DIR)
+from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert
+
+from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE_TO_SYNC,
+ EVENTSD,
+ CONFIG_KEYS,
+ BOOL_CONFIGS,
+ INT_CONFIGS,
+ PID_FILE,
+ RESTART_CONFIGS,
+ ERROR_INVALID_CONFIG,
+ ERROR_WEBHOOK_NOT_EXISTS,
+ ERROR_CONFIG_SYNC_FAILED,
+ ERROR_WEBHOOK_ALREADY_EXISTS,
+ ERROR_PARTIAL_SUCCESS,
+ ERROR_ALL_NODES_STATUS_NOT_OK,
+ ERROR_SAME_CONFIG,
+ ERROR_WEBHOOK_SYNC_FAILED,
+ CERTS_DIR)
def handle_output_error(err, errcode=1, json_output=False):
@@ -173,8 +173,10 @@ def sync_to_peers(args):
try:
sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC)
except GlusterCmdException as e:
+ # Print stdout if stderr is empty
+ errmsg = e.message[2] if e.message[2] else e.message[1]
handle_output_error("Failed to sync Webhooks file: [Error: {0}]"
- "{1}".format(e[0], e[2]),
+ "{1}".format(e.message[0], errmsg),
errcode=ERROR_WEBHOOK_SYNC_FAILED,
json_output=args.json)
@@ -182,8 +184,10 @@ def sync_to_peers(args):
try:
sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC)
except GlusterCmdException as e:
+ # Print stdout if stderr is empty
+ errmsg = e.message[2] if e.message[2] else e.message[1]
handle_output_error("Failed to sync Config file: [Error: {0}]"
- "{1}".format(e[0], e[2]),
+ "{1}".format(e.message[0], errmsg),
errcode=ERROR_CONFIG_SYNC_FAILED,
json_output=args.json)
@@ -349,8 +353,7 @@ class WebhookModCmd(Cmd):
errcode=ERROR_WEBHOOK_NOT_EXISTS,
json_output=args.json)
- if isinstance(data[args.url], str) or \
- isinstance(data[args.url], unicode):
+ if isinstance(data[args.url], str):
data[args.url]["token"] = data[args.url]
if args.bearer_token != "":
diff --git a/events/src/utils.py b/events/src/utils.py
index f405e44ac70..6d4e0791a2b 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -9,27 +9,34 @@
# cases as published by the Free Software Foundation.
#
+import sys
import json
import os
import logging
+import logging.handlers
import fcntl
from errno import EBADF
from threading import Thread
import multiprocessing
-from Queue import Queue
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
from datetime import datetime, timedelta
import base64
import hmac
from hashlib import sha256
from calendar import timegm
-from eventsapiconf import (LOG_FILE,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE,
- UUID_FILE,
- CERTS_DIR)
-import eventtypes
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from gfevents.eventsapiconf import (LOG_FILE,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ UUID_FILE,
+ CERTS_DIR)
+from gfevents import eventtypes
# Webhooks list
@@ -92,7 +99,7 @@ def setup_logger():
logger.setLevel(logging.INFO)
# create the logging file handler
- fh = logging.FileHandler(LOG_FILE)
+ fh = logging.handlers.WatchedFileHandler(LOG_FILE)
formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
"[%(module)s - %(lineno)s:%(funcName)s] "
@@ -206,7 +213,7 @@ def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60):
msg = base64_urlencode(header) + "." + base64_urlencode(payload)
return "%s.%s" % (
msg,
- base64_urlencode(hmac.HMAC(secret, msg, sha256).digest())
+ base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest())
)
@@ -384,7 +391,7 @@ class PidFile(object):
def webhook_monitor(proc_queue, webhooks):
queues = {}
for url, data in webhooks.items():
- if isinstance(data, str) or isinstance(data, unicode):
+ if isinstance(data, str):
token = data
secret = None
else:
@@ -405,8 +412,8 @@ def webhook_monitor(proc_queue, webhooks):
class WebhookThreadPool(object):
def start(self):
- # Seperate process to emit messages to webhooks
- # which maintains one thread per webhook. Seperate
+ # Separate process to emit messages to webhooks
+ # which maintains one thread per webhook. Separate
# process is required since on reload we need to stop
# and start the thread pool. In Python Threads can't be stopped
# so terminate the process and start again. Note: In transit