Skip to content

Commit

Permalink
feat: add logger to event bus, with optional ability to pass another …
Browse files Browse the repository at this point in the history
…logger in
  • Loading branch information
davidlougheed committed Jan 12, 2023
1 parent 1910a33 commit 0cc9ac0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
18 changes: 15 additions & 3 deletions bento_lib/events/_event_bus.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import jsonschema
import logging
import redis

from typing import Callable, Dict, Optional, Union
Expand Down Expand Up @@ -38,14 +39,19 @@ def __init__(self, allow_fake: bool = False, **kwargs):

self._rc: Optional[redis.Redis] = None

logger = kwargs.pop("logger", None) or logging.getLogger(__name__)
self._logger: logging.Logger = logger

connection_data: dict = kwargs or default_connection_data

try:
self._rc = self._get_redis(**(kwargs or default_connection_data))
self._rc = self._get_redis(**connection_data)
self._rc.get("") # Dummy request to check connection
except redis.exceptions.ConnectionError as e:
self._rc = None
if not allow_fake:
raise e
# TODO: Log otherwise (as info)
logger.warning(f"Starting event bus in 'fake' mode (tried connection data: {connection_data})")

self._ps: Optional[redis.PubSub] = None

Expand Down Expand Up @@ -91,6 +97,8 @@ def start_event_loop(self) -> None:
if self._event_thread is not None:
return

self._logger.debug("Starting EventBus event loop")

self._ps = self._rc.pubsub()
self._ps.psubscribe(**self._ps_handlers)
self._event_thread = self._ps.run_in_thread(sleep_time=0.001, daemon=True)
Expand All @@ -103,6 +111,8 @@ def stop_event_loop(self) -> None:
if self._event_thread is None:
return

self._logger.debug("Stopping EventBus event loop")

self._event_thread.stop()
self._event_thread = None
self._ps = None
Expand Down Expand Up @@ -179,7 +189,9 @@ def _publish_event(
if self._rc is None:
return False

self._rc.publish(channel, self._make_event(event_type, event_data, attrs))
event: str = self._make_event(event_type, event_data, attrs)
self._logger.debug(f"Publishing event: {event}")
self._rc.publish(channel, event)
return True

def publish_service_event(self, service_artifact: str, event_type: str, event_data: Serializable) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion bento_lib/package.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = bento_lib
version = 5.1.1
version = 5.2.0
authors = David Lougheed, Paul Pillot
author_emails = [email protected], [email protected]

0 comments on commit 0cc9ac0

Please sign in to comment.