Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events: Horizontal scaling support in EventStream.subscribe #495

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ayon_server/addons/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self) -> None:
logging.error(f"Addons directory does not exist: {addons_dir}")
return None

for addon_name in os.listdir(addons_dir):
for addon_name in sorted(os.listdir(addons_dir)):
# ignore hidden directories (such as .git)
if addon_name.startswith("."):
continue
Expand Down
25 changes: 25 additions & 0 deletions ayon_server/api/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ayon_server.background.background_worker import BackgroundWorker
from ayon_server.config import ayonconfig
from ayon_server.entities import UserEntity
from ayon_server.events.eventstream import EventStream, HandlerType
from ayon_server.lib.redis import Redis
from ayon_server.utils import get_nickname, json_dumps, json_loads, obscure

Expand All @@ -22,6 +23,28 @@
]


async def _handle_subscribers_task(event_id: str, handlers: list[HandlerType]) -> None:
event = await EventStream.get(event_id)
for handler in handlers:
try:
await handler(event)
except Exception:
logging.debug(f"Error in global event handler: {handler}")


async def handle_subscribers(message: dict[str, Any]) -> None:
event_id = message.get("id", None)
store = message.get("store", None)
topic = message.get("topic", None)
if not (event_id and store):
return

handlers = EventStream.global_hooks.get(topic, {}).values()
if not handlers:
return
asyncio.create_task(_handle_subscribers_task(event_id, list(handlers)))


class Client:
id: str
sock: WebSocket
Expand Down Expand Up @@ -143,6 +166,8 @@ async def run(self) -> None:
else:
message = json_loads(raw_message["data"])

await handle_subscribers(message)

# TODO: much much smarter logic here
for _client_id, client in self.clients.items():
project_name = message.get("project", None)
Expand Down
41 changes: 30 additions & 11 deletions ayon_server/events/eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,44 @@

class EventStream:
model: Type[EventModel] = EventModel
hooks: dict[str, dict[str, HandlerType]] = {}
local_hooks: dict[str, dict[str, HandlerType]] = {}
global_hooks: dict[str, dict[str, HandlerType]] = {}
Comment on lines +18 to +19
Copy link
Member

@iLLiCiTiT iLLiCiTiT Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this you can get rid of multiple if topic not in *hooks* conditions and avoid possible missing keys issues.

Suggested change
local_hooks: dict[str, dict[str, HandlerType]] = {}
global_hooks: dict[str, dict[str, HandlerType]] = {}
local_hooks: dict[str, dict[str, HandlerType]] = collections.defaultdict(dict)
global_hooks: dict[str, dict[str, HandlerType]] = collections.defaultdict(dict)

Not requirement tho...


@classmethod
def subscribe(cls, topic: str, handler: HandlerType) -> str:
def subscribe(
cls, topic: str, handler: HandlerType, all_nodes: bool = False
) -> str:
token = create_id()
if topic not in cls.hooks:
cls.hooks[topic] = {}
cls.hooks[topic][token] = handler
return token
if all_nodes:
if topic not in cls.global_hooks:
cls.global_hooks[topic] = {}
cls.global_hooks[topic][token] = handler
return token
else:
if topic not in cls.local_hooks:
cls.local_hooks[topic] = {}
cls.local_hooks[topic][token] = handler
return token
Comment on lines +26 to +35
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if all_nodes:
if topic not in cls.global_hooks:
cls.global_hooks[topic] = {}
cls.global_hooks[topic][token] = handler
return token
else:
if topic not in cls.local_hooks:
cls.local_hooks[topic] = {}
cls.local_hooks[topic][token] = handler
return token
hooks = cls.global_hooks if all_nodes else cls.local_hooks
topic_hooks = hooks.setdefault(topic, {})
topic_hooks[token] = handler
return token


@classmethod
def unsubscribe(cls, token: str) -> None:
# Local hooks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works, but can be improved (multiple different ways):

        for hooks in (cls.global_hooks, cls.local_hooks):
            for topic, mapping in tuple(hooks.items()):
                mapping.pop(token, None)
                if not mapping:
                    hooks.pop(topic)

topics_to_remove = []
for topic in cls.local_hooks:
cls.local_hooks[topic].pop(token, None)
if not cls.local_hooks[topic]:
topics_to_remove.append(topic)
for topic in topics_to_remove:
cls.local_hooks.pop(topic)

# Global hooks
topics_to_remove = []
for topic in cls.hooks:
cls.hooks[topic].pop(token, None)
if not cls.hooks[topic]:
for topic in cls.global_hooks:
cls.global_hooks[topic].pop(token, None)
if not cls.global_hooks[topic]:
topics_to_remove.append(topic)
for topic in topics_to_remove:
cls.hooks.pop(topic)
cls.global_hooks.pop(topic)

@classmethod
async def dispatch(
Expand Down Expand Up @@ -192,7 +211,7 @@ async def dispatch(
)
)

handlers = cls.hooks.get(event.topic, {}).values()
handlers = cls.local_hooks.get(event.topic, {}).values()
for handler in handlers:
try:
await handler(event)
Expand Down