From e0f40abb7258e59bff4dde65df358fb2057edb4f Mon Sep 17 00:00:00 2001 From: berrytern Date: Fri, 27 Sep 2024 00:23:58 -0300 Subject: [PATCH 01/20] chore: Add CODE OF CONDUCT --- CODE_OF_CONDUCT.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 CODE_OF_CONDUCT.md diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..6e0b6c4 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,22 @@ +# Contributor Code of Conduct + +As contributors and maintainers of this project, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities. + +We are committed to making participation in this project a harassment-free experience for everyone, regardless of level of experience, gender identity, sexual orientation, disability, personal appearance, race, ethnicity, age, religion, or nationality. + +Examples of unacceptable behavior include: + +* The use of sexualized language or imagery +* Personal attacks +* Trolling or insulting/derogatory comments +* Public or private harassment +* Publishing others' private information, such as physical or electronic addresses, without explicit permission +* Other unethical or unprofessional conduct + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at [email@example.com]. All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. + +## Attribution + +This Code of Conduct is adapted from the Contributor Covenant, version 2.1, available at https://www.contributor-covenant.org/version/2/1/code_of_conduct.html From 60b9d9df94b3e93c7d4d0eb638c635cc3212217c Mon Sep 17 00:00:00 2001 From: berrytern Date: Fri, 27 Sep 2024 09:07:20 -0300 Subject: [PATCH 02/20] chore: add discord link to CODE OF CONDUCT --- CODE_OF_CONDUCT.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 6e0b6c4..3bd30c9 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -15,7 +15,7 @@ Examples of unacceptable behavior include: ## Enforcement -Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at [email@example.com]. All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. +Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at our [discord channel](https://discord.gg/RkXNeZpNZk). All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. ## Attribution From 9549478ef65824dbe881e450aac3fa575a149965 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 01:09:26 -0300 Subject: [PATCH 03/20] feat: add logic to process messages --- amqp_client_python/rabbitmq/async_channel.py | 137 +++++++++++-------- 1 file changed, 80 insertions(+), 57 deletions(-) diff --git a/amqp_client_python/rabbitmq/async_channel.py b/amqp_client_python/rabbitmq/async_channel.py index d24de20..8dbbc3c 100644 --- a/amqp_client_python/rabbitmq/async_channel.py +++ b/amqp_client_python/rabbitmq/async_channel.py @@ -1,4 +1,4 @@ -from typing import MutableMapping, Mapping, Optional, Union, Callable, Dict +from typing import MutableMapping, Mapping, Optional, Union, Callable, Dict, List from .async_channel_factory import AsyncChannelFactoryRabbitMQ from amqp_client_python.exceptions import ( NackException, @@ -16,6 +16,7 @@ from json import dumps, loads from uuid import uuid4 import logging +import re LOGGER = logging.getLogger(__name__) @@ -50,6 +51,7 @@ def __init__( self._prefetch_count = prefetch_count self.auto_ack = auto_ack self.subscribes: Dict[str, Dict[str, Union[Callable, str, float]]] = {} + self.topic_subscribes: List[str] = [] self.consumers: Dict[str, bool] = {} self.publisher_confirms = False self._message_number = 0 @@ -105,6 +107,7 @@ def on_channel_closed(self, channel, reason): :param pika.channel.Channel: The closed channel :param Exception reason: why the channel was closed """ + print("channel closed:", reason, flush=True) LOGGER.info(f"Channel {channel} was closed: {reason}") self._consuming = False if self._connection.is_closing or self._connection.is_closed: @@ -456,19 +459,24 @@ async def rpc_subscribe( response_timeout, content_type="application/json", exchange_type="direct", + durable=True, + auto_delete=False, ): await self.start_rpc_publisher() - self.add_subscribe( - queue_name, - routing_key, - callback, - content_type=content_type, - response_timeout=response_timeout, + self.register_queue( + exchange_name, queue_name, exchange_type, durable, auto_delete ) - self.setup_exchange(exchange_name, exchange_type) - self.queue_declare(queue_name) + + if routing_key not in self.subscribes[queue_name]: + self.subscribes[queue_name][routing_key] = { + "handle": callback, + "content_type": content_type, + "response_timeout": response_timeout or self.response_timeout, + } + if "*" in routing_key and routing_key not in self.topic_subscribes: + self.topic_subscribes.append(routing_key) self.queue_bind(queue_name, exchange_name, routing_key) - if queue_name not in self.consumers: + if not self.consumers[queue_name]: registered: Future[bool] = Future(loop=self.ioloop) self.consumers[queue_name] = True func = partial(self.on_message, queue_name) @@ -480,6 +488,23 @@ async def rpc_subscribe( ) await registered + def register_queue( + self, + exchange_name, + queue_name: str, + exchange_type="direct", + durable=True, + auto_delete=False, + ): + if queue_name not in self.subscribes: + if not len(self.subscribes): + self.subscribes[queue_name] = {} + self.setup_exchange(exchange_name, exchange_type) + self.queue_declare(queue_name, durable=durable, auto_delete=auto_delete) + + if queue_name not in self.consumers: + self.consumers[queue_name] = False + async def subscribe( self, exchange_name, @@ -489,18 +514,23 @@ async def subscribe( response_timeout, content_type="application/json", exchange_type="direct", + durable=True, + auto_delete=False, ): - self.add_subscribe( - queue_name, - routing_key, - callback, - content_type=content_type, - response_timeout=response_timeout, + self.register_queue( + exchange_name, queue_name, exchange_type, durable, auto_delete ) - self.setup_exchange(exchange_name, exchange_type) - self.queue_declare(queue_name, durable=True) + + if routing_key not in self.subscribes[queue_name]: + self.subscribes[queue_name][routing_key] = { + "handle": callback, + "content_type": content_type, + "response_timeout": response_timeout or self.response_timeout, + } + if "*" in routing_key and routing_key not in self.topic_subscribes: + self.topic_subscribes.append(routing_key) self.queue_bind(queue_name, exchange_name, routing_key) - if queue_name not in self.consumers: + if not self.consumers[queue_name]: self.consumers[queue_name] = True func = partial(self.on_message, queue_name) self._channel.basic_consume( @@ -522,35 +552,40 @@ def on_message( :param bytes body: The message body """ + async def process_message(sub): + nonlocal body + body = loads(body) + response = await wait_for( + sub["handle"](*body["handle"]), + timeout=sub["response_timeout"], + ) + if self.rpc_publisher_started and response and props.reply_to: + self._channel_rpc.basic_publish( + "", + props.reply_to, + response, + properties=BasicProperties( + correlation_id=props.correlation_id, + content_type=sub["content_type"], + type="normal", + ), + ) + not self.auto_ack and not self._channel.basic_ack( + basic_deliver.delivery_tag + ) + async def handle_message(queue_name, basic_deliver, props, body): try: if basic_deliver.routing_key in self.subscribes[queue_name]: - body = loads(body) - response = await wait_for( - self.subscribes[queue_name][basic_deliver.routing_key][ - "handle" - ](*body["handle"]), - timeout=self.subscribes[queue_name][basic_deliver.routing_key][ - "response_timeout" - ], - ) - if self.rpc_publisher_started and response and props.reply_to: - self._channel_rpc.basic_publish( - "", - props.reply_to, - response, - properties=BasicProperties( - correlation_id=props.correlation_id, - content_type=self.subscribes[queue_name][ - basic_deliver.routing_key - ]["content_type"], - type="normal", - ), - ) - not self.auto_ack and not self._channel.basic_ack( - basic_deliver.delivery_tag + await process_message( + self.subscribes[queue_name][basic_deliver.routing_key] ) else: + for routing_key in self.topic_subscribes: + if re.match(routing_key, basic_deliver.routing_key): + return await process_message( + self.subscribes[queue_name][routing_key] + ) if self.rpc_publisher_started and props.reply_to: self._channel_rpc.basic_publish( "", @@ -562,9 +597,10 @@ async def handle_message(queue_name, basic_deliver, props, body): type="error", ), ) - not self.auto_ack and self._channel.basic_nack( + return not self.auto_ack and self._channel.basic_nack( basic_deliver.delivery_tag, requeue=False ) + except BaseException as err: if self.rpc_publisher_started and props.reply_to: self._channel_rpc.basic_publish( @@ -584,16 +620,3 @@ async def handle_message(queue_name, basic_deliver, props, body): self.ioloop.create_task( # type: ignore handle_message(queue_name, basic_deliver, props, body) ) - - def add_subscribe( - self, queue_name, routing_key, handle, content_type, response_timeout=None - ): - if queue_name not in self.subscribes: - if not len(self.subscribes): - self.subscribes[queue_name] = {} - if routing_key not in self.subscribes[queue_name]: - self.subscribes[queue_name][routing_key] = { - "handle": handle, - "content_type": content_type, - "response_timeout": response_timeout or self.response_timeout, - } From 436f65130a68a73d527049d148cdb317dd2a8914 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 01:42:52 -0300 Subject: [PATCH 04/20] chore: set default exchange to topic --- amqp_client_python/rabbitmq/async_channel.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/amqp_client_python/rabbitmq/async_channel.py b/amqp_client_python/rabbitmq/async_channel.py index 8dbbc3c..82558d5 100644 --- a/amqp_client_python/rabbitmq/async_channel.py +++ b/amqp_client_python/rabbitmq/async_channel.py @@ -452,13 +452,13 @@ def clean_rpc_response(self, corr_id: str, _fut): async def rpc_subscribe( self, - exchange_name, + exchange_name: str, routing_key: str, queue_name: str, callback, response_timeout, content_type="application/json", - exchange_type="direct", + exchange_type="topic", durable=True, auto_delete=False, ): @@ -574,7 +574,7 @@ async def process_message(sub): basic_deliver.delivery_tag ) - async def handle_message(queue_name, basic_deliver, props, body): + async def handle_message(queue_name, basic_deliver, props): try: if basic_deliver.routing_key in self.subscribes[queue_name]: await process_message( @@ -618,5 +618,5 @@ async def handle_message(queue_name, basic_deliver, props, body): ) self.ioloop.create_task( # type: ignore - handle_message(queue_name, basic_deliver, props, body) + handle_message(queue_name, basic_deliver, props) ) From 82b1b326d962d1374be4d225b24ab37265c8abd8 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 15:41:51 -0300 Subject: [PATCH 05/20] chore: minor refact and add HandlerType --- amqp_client_python/domain/utils/__init__.py | 1 + amqp_client_python/domain/utils/types.py | 7 ++ amqp_client_python/rabbitmq/async_channel.py | 67 ++++++++++---------- 3 files changed, 43 insertions(+), 32 deletions(-) create mode 100644 amqp_client_python/domain/utils/types.py diff --git a/amqp_client_python/domain/utils/__init__.py b/amqp_client_python/domain/utils/__init__.py index 720f872..a2d94de 100644 --- a/amqp_client_python/domain/utils/__init__.py +++ b/amqp_client_python/domain/utils/__init__.py @@ -1 +1,2 @@ from .connection_type import ConnectionType +from .types import HandlerType diff --git a/amqp_client_python/domain/utils/types.py b/amqp_client_python/domain/utils/types.py new file mode 100644 index 0000000..5a659a8 --- /dev/null +++ b/amqp_client_python/domain/utils/types.py @@ -0,0 +1,7 @@ +from typing import TypedDict, Optional, Callable + + +class HandlerType(TypedDict): + response_timeout: Optional[int] + handle: Callable + content_type: str diff --git a/amqp_client_python/rabbitmq/async_channel.py b/amqp_client_python/rabbitmq/async_channel.py index 82558d5..8bfef7f 100644 --- a/amqp_client_python/rabbitmq/async_channel.py +++ b/amqp_client_python/rabbitmq/async_channel.py @@ -1,4 +1,4 @@ -from typing import MutableMapping, Mapping, Optional, Union, Callable, Dict, List +from typing import MutableMapping, Mapping, Optional, Union, Dict, List from .async_channel_factory import AsyncChannelFactoryRabbitMQ from amqp_client_python.exceptions import ( NackException, @@ -8,6 +8,7 @@ EventBusException, ) from amqp_client_python.signals import Signal, Event +from amqp_client_python.domain.utils.types import HandlerType from pika.adapters.asyncio_connection import AsyncioConnection from pika.channel import Channel from pika import BasicProperties, DeliveryMode @@ -50,8 +51,8 @@ def __init__( self.consumer_tag = None self._prefetch_count = prefetch_count self.auto_ack = auto_ack - self.subscribes: Dict[str, Dict[str, Union[Callable, str, float]]] = {} - self.topic_subscribes: List[str] = [] + self.subscribes: Dict[str, Dict[str, HandlerType]] = {} + self.re_subscribes: List[str] = [] self.consumers: Dict[str, bool] = {} self.publisher_confirms = False self._message_number = 0 @@ -466,15 +467,10 @@ async def rpc_subscribe( self.register_queue( exchange_name, queue_name, exchange_type, durable, auto_delete ) + self.register_handler( + queue_name, routing_key, callback, content_type, response_timeout + ) - if routing_key not in self.subscribes[queue_name]: - self.subscribes[queue_name][routing_key] = { - "handle": callback, - "content_type": content_type, - "response_timeout": response_timeout or self.response_timeout, - } - if "*" in routing_key and routing_key not in self.topic_subscribes: - self.topic_subscribes.append(routing_key) self.queue_bind(queue_name, exchange_name, routing_key) if not self.consumers[queue_name]: registered: Future[bool] = Future(loop=self.ioloop) @@ -505,30 +501,37 @@ def register_queue( if queue_name not in self.consumers: self.consumers[queue_name] = False + def register_handler( + self, queue_name, routing_key, callback, content_type, response_timeout + ): + if routing_key not in self.subscribes[queue_name]: + self.subscribes[queue_name][routing_key] = { + "handle": callback, + "content_type": content_type, + "response_timeout": response_timeout or self.response_timeout, + } + if "*" in routing_key and routing_key not in self.re_subscribes: + self.re_subscribes.append(routing_key) + async def subscribe( self, - exchange_name, + exchange_name: str, routing_key: str, queue_name: str, callback, - response_timeout, - content_type="application/json", - exchange_type="direct", - durable=True, - auto_delete=False, - ): + response_timeout: Optional[int], + content_type: str = "application/json", + exchange_type: str = "direct", + durable: bool = True, + auto_delete: bool = False, + ) -> None: self.register_queue( exchange_name, queue_name, exchange_type, durable, auto_delete ) + self.register_handler( + queue_name, routing_key, callback, content_type, response_timeout + ) - if routing_key not in self.subscribes[queue_name]: - self.subscribes[queue_name][routing_key] = { - "handle": callback, - "content_type": content_type, - "response_timeout": response_timeout or self.response_timeout, - } - if "*" in routing_key and routing_key not in self.topic_subscribes: - self.topic_subscribes.append(routing_key) self.queue_bind(queue_name, exchange_name, routing_key) if not self.consumers[queue_name]: self.consumers[queue_name] = True @@ -539,7 +542,7 @@ async def subscribe( def on_message( self, queue_name, _unused_channel, basic_deliver, props: BasicProperties, body - ): + ) -> None: """Invoked by pika when a message is delivered from RabbitMQ. The channel is passed for your convenience. The basic_deliver object that is passed in carries the exchange, routing key, delivery tag and @@ -552,12 +555,12 @@ def on_message( :param bytes body: The message body """ - async def process_message(sub): + async def process_message(handler: HandlerType): nonlocal body body = loads(body) response = await wait_for( - sub["handle"](*body["handle"]), - timeout=sub["response_timeout"], + handler["handle"](*body["handle"]), + timeout=handler["response_timeout"], ) if self.rpc_publisher_started and response and props.reply_to: self._channel_rpc.basic_publish( @@ -566,7 +569,7 @@ async def process_message(sub): response, properties=BasicProperties( correlation_id=props.correlation_id, - content_type=sub["content_type"], + content_type=handler["content_type"], type="normal", ), ) @@ -581,7 +584,7 @@ async def handle_message(queue_name, basic_deliver, props): self.subscribes[queue_name][basic_deliver.routing_key] ) else: - for routing_key in self.topic_subscribes: + for routing_key in self.re_subscribes: if re.match(routing_key, basic_deliver.routing_key): return await process_message( self.subscribes[queue_name][routing_key] From 48ca47571c4954de5a8aa42fac1dedec089936f1 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 15:52:17 -0300 Subject: [PATCH 06/20] chore: add deprecated warning to old synchronous eventbus --- amqp_client_python/rabbitmq/eventbus_rabbitmq.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/amqp_client_python/rabbitmq/eventbus_rabbitmq.py b/amqp_client_python/rabbitmq/eventbus_rabbitmq.py index 7f40074..1ceb847 100644 --- a/amqp_client_python/rabbitmq/eventbus_rabbitmq.py +++ b/amqp_client_python/rabbitmq/eventbus_rabbitmq.py @@ -9,10 +9,16 @@ from concurrent.futures import Future as syncFuture from asyncio import AbstractEventLoop import asyncio +import warnings class EventbusRabbitMQ: def __init__(self, config: Config) -> None: + warnings.warn( + f"{self.__class__.__name__} is deprecated and will no longer be supported in future releases", + DeprecationWarning, + stacklevel=2, + ) self.pub_connection = ConnectionRabbitMQ(connection_type=ConnectionType.PUBLISH) self.sub_connection = ConnectionRabbitMQ( connection_type=ConnectionType.SUBSCRIBE From c37a152f1df49d153da3fbb92e72dd1758b0393a Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 16:17:20 -0300 Subject: [PATCH 07/20] chore: update test case --- amqp_client_python/rabbitmq/async_channel.py | 2 +- tests/unit/channels/test_rpc_subscribe.py | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/amqp_client_python/rabbitmq/async_channel.py b/amqp_client_python/rabbitmq/async_channel.py index 8bfef7f..37fe2b2 100644 --- a/amqp_client_python/rabbitmq/async_channel.py +++ b/amqp_client_python/rabbitmq/async_channel.py @@ -473,7 +473,7 @@ async def rpc_subscribe( self.queue_bind(queue_name, exchange_name, routing_key) if not self.consumers[queue_name]: - registered: Future[bool] = Future(loop=self.ioloop) + registered: Future[bool] = self.ioloop.create_future() self.consumers[queue_name] = True func = partial(self.on_message, queue_name) self._channel.basic_consume( diff --git a/tests/unit/channels/test_rpc_subscribe.py b/tests/unit/channels/test_rpc_subscribe.py index 6353f3e..a8a98c6 100644 --- a/tests/unit/channels/test_rpc_subscribe.py +++ b/tests/unit/channels/test_rpc_subscribe.py @@ -19,11 +19,12 @@ async def test_async_channel_subscribe( future_publisher = Future(loop=loop) future_publisher.set_result(True) - connection_mock.ioloop.create_future.side_effect = [ - future_publisher, - ] + connection_mock.ioloop.call_later = loop.call_later + connection_mock.ioloop.create_future.return_value = future_publisher channel_factory_mock.create_channel.return_value = channel_mock - channel = AsyncChannel(channel_factory=channel_factory_mock, channel_type=ConnectionType.RPC_SERVER) + channel = AsyncChannel( + channel_factory=channel_factory_mock, channel_type=ConnectionType.RPC_SERVER + ) channel.open(connection_mock) async def handle(*body): @@ -34,7 +35,7 @@ async def handle(*body): ) assert channel._channel == channel_mock assert iscoroutine(rpc_subscribe) - assert await rpc_subscribe is None + assert (await rpc_subscribe) is None channel_mock.basic_consume.assert_called_once() assert channel_mock.basic_consume.call_args.args == (queue_name,) @@ -54,9 +55,7 @@ async def test_rpc_subscribe_publisher_started( future_publisher = Future(loop=loop) if consumer: future_publisher.set_result(True) - connection_mock.ioloop.create_future.side_effect = [ - future_publisher, - ] + connection_mock.ioloop.create_future.return_value = future_publisher connection_mock.ioloop.call_later = loop.call_later channel_factory_mock.create_channel.return_value = channel_mock channel = AsyncChannel(channel_factory=channel_factory_mock) @@ -66,10 +65,10 @@ async def handle(*body): return b"result" rpc_subscribe = channel.rpc_subscribe( - exchange, routing_key, queue_name, handle, content_type, connection_mock.ioloop + exchange, routing_key, queue_name, handle, content_type ) if consumer: - assert await rpc_subscribe is None + assert (await rpc_subscribe) is None assert channel.rpc_publisher_starting is True else: with pytest.raises(EventBusException): From 07dc7524ec1f066bef41fa218c779567553f07b3 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 16:30:31 -0300 Subject: [PATCH 08/20] chore: adjustments in mypy checks --- amqp_client_python/rabbitmq/async_channel.py | 1 + 1 file changed, 1 insertion(+) diff --git a/amqp_client_python/rabbitmq/async_channel.py b/amqp_client_python/rabbitmq/async_channel.py index 37fe2b2..2adf069 100644 --- a/amqp_client_python/rabbitmq/async_channel.py +++ b/amqp_client_python/rabbitmq/async_channel.py @@ -473,6 +473,7 @@ async def rpc_subscribe( self.queue_bind(queue_name, exchange_name, routing_key) if not self.consumers[queue_name]: + assert self.ioloop is not None registered: Future[bool] = self.ioloop.create_future() self.consumers[queue_name] = True func = partial(self.on_message, queue_name) From 5a579931d4a2e436a696b02bad12755b4aca4cb9 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 16:53:56 -0300 Subject: [PATCH 09/20] chore: set default subscribe's exchange to topic --- amqp_client_python/rabbitmq/async_channel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amqp_client_python/rabbitmq/async_channel.py b/amqp_client_python/rabbitmq/async_channel.py index 2adf069..b0aaaa3 100644 --- a/amqp_client_python/rabbitmq/async_channel.py +++ b/amqp_client_python/rabbitmq/async_channel.py @@ -522,7 +522,7 @@ async def subscribe( callback, response_timeout: Optional[int], content_type: str = "application/json", - exchange_type: str = "direct", + exchange_type: str = "topic", durable: bool = True, auto_delete: bool = False, ) -> None: From b8b7de5d4d8ae9bb460f8e47f5cb0771574e7ccc Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 18:27:56 -0300 Subject: [PATCH 10/20] feat(messaging): simplify message structure and delegate data handling - Remove body wrapper from message structure - Transfer responsibility for data management to end-users - Improve performance by reducing message overhead - BREAKING CHANGE: Clients need to handle raw message data directly --- README.md | 54 ++++++----------- amqp_client_python/rabbitmq/async_channel.py | 8 +-- .../rabbitmq/async_connection.py | 6 +- .../rabbitmq/async_eventbus_rabbitmq.py | 44 +++++++------- .../rabbitmq/eventbus_wrapper_rabbitmq.py | 60 ++++++++++++------- 5 files changed, 85 insertions(+), 87 deletions(-) diff --git a/README.md b/README.md index 06b166b..f22551c 100644 --- a/README.md +++ b/README.md @@ -57,34 +57,26 @@ from amqp_client_python import ( AsyncEventbusRabbitMQ, Config, Options ) -from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler config = Config(Options("queue", "rpc_queue", "rpc_exchange")) eventbus = AsyncEventbusRabbitMQ(config) # publish -class ExampleEvent(IntegrationEvent): - EVENT_NAME: str = "ExampleEvent" - def __init__(self, event_type: str, message = []) -> None: - super().__init__(self.EVENT_NAME, event_type) - self.message = message -publish_event = ExampleEvent(rpc_exchange, ["message"]) -eventbus.publish(publish_event, rpc_routing_key, "direct") +eventbus.publish("rpc_exchange", "routing.key", "direct") # subscribe -class ExampleEventHandler(IntegrationEventHandler): - async def handle(self, body) -> None: - print(body) # handle messages -await eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key) +async def subscribe_handler(body) -> None: + print(body, type(body), flush=True) # handle messages +await eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler) # rpc_publish -response = await eventbus.rpc_client(rpc_exchange, "user.find", ["content_message"]) +response = await eventbus.rpc_client("rpc_exchange", "user.find", "message_content") # provider -async def handle2(*body) -> bytes: +async def rpc_provider_handler(body) -> bytes: print(f"body: {body}") return b"content" -await eventbus.provide_resource("user.find", handle) +await eventbus.provide_resource("user.find", rpc_provider_handler) ``` -
sync usage +
sync usage(deprecated) ```Python from amqp_client_python import ( @@ -153,40 +145,30 @@ while running: ```Python from amqp_client_python import EventbusWrapperRabbitMQ, Config, Options -from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler -config = Config(Options(queue, rpc_queue, rpc_exchange)) +config = Config(Options("queue", "rpc_queue", "rpc_exchange")) eventbus = EventbusWrapperRabbitMQ(config=config) -class ExampleEvent(IntegrationEvent): - EVENT_NAME: str = "ExampleEvent" - def __init__(self, event_type: str, message = []) -> None: - super().__init__(self.EVENT_NAME, event_type) - self.message = message -class ExampleEventHandler(IntegrationEventHandler): - async def handle(self, body) -> None: - print(body,"subscribe") +async def subscribe_handler(body) -> None: + print(f"{body}", type(body), flush=True) -async def handle(*body): - print(body[0], "rpc_provider") - return f"{body[0]}".encode("utf-8") +async def rpc_provider_handler(body) -> bytes: + print(f"handle - {body}", type(body), flush=True) + return f"{body}".encode("utf-8") -subscribe_event = ExampleEvent(rpc_exchange) -publish_event = ExampleEvent(rpc_exchange, ["message"]) -subscribe_event_handle = ExampleEventHandler() # rpc_provider -eventbus.provide_resource(rpc_routing_key+"2", handle).result() +eventbus.provide_resource("user.find", rpc_provider_handler).result() # subscribe -eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key).result() +eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler).result() count = 0 running = True while running: try: count += 1 # rpc_client call - eventbus.rpc_client(rpc_exchange, rpc_routing_key+"2", [f"{count}"]).result().decode("utf-8") + eventbus.rpc_client("rpc_exchange", "user.find", count).result().decode("utf-8") # publish - eventbus.publish(publish_event, rpc_routing_key, "direct").result() + eventbus.publish("rpc_exchange", "routing.key", "direct").result() #running = False except KeyboardInterrupt: running=False diff --git a/amqp_client_python/rabbitmq/async_channel.py b/amqp_client_python/rabbitmq/async_channel.py index b0aaaa3..ebede6b 100644 --- a/amqp_client_python/rabbitmq/async_channel.py +++ b/amqp_client_python/rabbitmq/async_channel.py @@ -340,7 +340,6 @@ async def rpc_client( **key_args, ): future = self.ioloop.create_future() # type: ignore - message = dumps({"resource_name": routing_key, "handle": body}) corr_id = str(uuid4()) self.rpc_futures[corr_id] = {"response": future} clean_response = partial(self.clean_rpc_response, corr_id) @@ -351,7 +350,7 @@ async def rpc_client( self._channel.basic_publish( exchange_name, routing_key, - message, + dumps(body), properties=BasicProperties( reply_to=self._callback_queue, correlation_id=corr_id, @@ -409,11 +408,10 @@ async def publish( expiration: Optional[Union[str, None]] = None, **key_args, ): - message = dumps({"handle": body}) self._channel.basic_publish( exchange_name, routing_key, - message, + dumps(body), properties=BasicProperties( reply_to=self._callback_queue, content_type=content_type, @@ -560,7 +558,7 @@ async def process_message(handler: HandlerType): nonlocal body body = loads(body) response = await wait_for( - handler["handle"](*body["handle"]), + handler["handle"](body), timeout=handler["response_timeout"], ) if self.rpc_publisher_started and response and props.reply_to: diff --git a/amqp_client_python/rabbitmq/async_connection.py b/amqp_client_python/rabbitmq/async_connection.py index 9050816..6b990bf 100644 --- a/amqp_client_python/rabbitmq/async_connection.py +++ b/amqp_client_python/rabbitmq/async_connection.py @@ -170,8 +170,8 @@ async def rpc_client( self, exchange_name: str, routing_key: str, - body, - content_type, + body: Any, + content_type: str, timeout, delivery_mode, expiration, @@ -192,7 +192,7 @@ async def publish( self, exchange_name: str, routing_key: str, - body, + body: Any, content_type, timeout: float, delivery_mode, diff --git a/amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py b/amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py index dc6a0db..26cadf3 100644 --- a/amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py +++ b/amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py @@ -1,7 +1,6 @@ from typing import Callable, Awaitable, Optional, Union, Any, List from .async_connection import AsyncConnection from ..domain.utils import ConnectionType -from ..event import IntegrationEvent, AsyncSubscriberHandler from amqp_client_python.domain.models import Config from asyncio import AbstractEventLoop, get_event_loop from pika import DeliveryMode @@ -100,7 +99,7 @@ async def rpc_client( self, exchange: str, routing_key: str, - body: List[Any], + body: Any, content_type: str = "application/json", timeout: float = 5, connection_timeout: float = 16, @@ -155,9 +154,9 @@ async def add_rpc_client(): async def publish( self, - event: IntegrationEvent, + exchange_name: str, routing_key: str, - body: List[Any], + body: Any, content_type: str = "application/json", timeout: float = 5, connection_timeout: float = 16, @@ -190,13 +189,14 @@ async def publish( Examples: - >>> publish_event = ExampleEvent("example.rpc") - >>> await eventbus.publish(publish_event, "user.find3", ["content_message"]) + >>> exchange_name = "example.rpc" + >>> routing_key = "user.find3" + >>> await eventbus.publish(exchange_name, routing_key, ["content_message"]) """ async def add_publish(): return await self._pub_connection.publish( - event.event_type, + exchange_name, routing_key, body, content_type, @@ -212,7 +212,7 @@ async def add_publish(): async def provide_resource( self, name: str, - callback: Callable[[List[Any]], Awaitable[Union[bytes, str]]], + handler: Callable[[List[Any]], Awaitable[Union[bytes, str]]], response_timeout: Optional[int] = None, connection_timeout: int = 16, ) -> None: @@ -221,7 +221,7 @@ async def provide_resource( Args: name: routing_key name - callback: message handler + handler: message handler, it will be called when a message is received response_timeout: timeout in seconds for waiting for process the received message connection_timeout: timeout for waiting for connection restabilishment @@ -232,7 +232,7 @@ async def provide_resource( AutoReconnectException: when cannout reconnect on the gived timeout Examples: - >>> async def handle(*body) -> Union[bytes, str]: + >>> async def handle(body) -> Union[bytes, str]: print(f"received message: {body}") return b"[]" >>> await eventbus.provide_resource("user.find", handle) @@ -243,7 +243,7 @@ async def add_resource(): self.config.options.rpc_queue_name, self.config.options.rpc_exchange_name, name, - callback, + handler, response_timeout, ) @@ -252,9 +252,9 @@ async def add_resource(): async def subscribe( self, - event: IntegrationEvent, - handler: AsyncSubscriberHandler, + exchange_name: str, routing_key: str, + handler: Callable[[Any], Awaitable[None]], response_timeout: Optional[float] = None, connection_timeout: int = 16, ) -> None: @@ -262,8 +262,9 @@ async def subscribe( Register a provider to listen on queue of bus Args: - name: routing_key name - callback: message handler + exchange_name: exchange name + routing_key: routing_key name + handler: message handler, it will be called when a message is received response_timeout: timeout in seconds for waiting for process the received message connection_timeout: timeout for waiting for connection restabilishment @@ -274,18 +275,21 @@ async def subscribe( AutoReconnectException: when cannout reconnect on the gived timeout Examples: - >>> async def handle(*body) -> None: + >>> async def handle(body) -> None: print(f"received message: {body}") - >>> subscribe_event = ExampleEvent("example.rpc") - >>> await eventbus.subscribe(subscribe_event, event_handle, "user.find3") + >>> exchange_name = "example" + >>> routing_key = "user.find3" + >>> response_timeout = 20 + >>> connection_timeout = 16 + >>> await eventbus.subscribe(exchange_name, routing_key, handle, response_timeout, connection_timeout) """ async def add_subscribe(): await self._sub_connection.subscribe( self.config.options.queue_name, - event.event_type, + exchange_name, routing_key, - handler.handle, + handler, response_timeout, ) diff --git a/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py b/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py index f15b6aa..360d752 100644 --- a/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py +++ b/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py @@ -1,6 +1,5 @@ from typing import Callable, Awaitable, Optional, Union, Any, List from .async_eventbus_rabbitmq import AsyncEventbusRabbitMQ -from ..event import IntegrationEvent, AsyncSubscriberHandler from ..exceptions import BlockingException, ThreadUnsafeException from amqp_client_python.domain.models import Config from pika import DeliveryMode @@ -46,7 +45,7 @@ def rpc_client( self, exchange: str, routing_key: str, - body: List[Any], + body: Any, content_type="application/json", timeout=5, ) -> Future: @@ -65,7 +64,7 @@ async def async_rpc_client( self, exchange: str, routing_key: str, - body: List[Any], + body: Any, content_type="application/json", timeout=5, ): @@ -79,9 +78,9 @@ async def async_rpc_client( async def async_publish( self, - event: IntegrationEvent, + exchange_name: str, routing_key: str, - body: List[Any], + body: Any, content_type="application/json", timeout=5, connection_timeout: int = 16, @@ -94,7 +93,7 @@ async def async_publish( "Cannot run async call on this thread, try to use sync thread safe methods" ) await self._async_eventbus.publish( - event, + exchange_name, routing_key, body, content_type, @@ -107,9 +106,9 @@ async def async_publish( def publish( self, - event: IntegrationEvent, + exchange_name: str, routing_key: str, - body: List[Any], + body: Any, content_type="application/json", timeout=5, connection_timeout: int = 16, @@ -123,19 +122,26 @@ def publish( ) return run_coroutine_threadsafe( self._async_eventbus.publish( - event, routing_key, body, content_type, timeout, connection_timeout, - delivery_mode, expiration, **kwargs + exchange_name, + routing_key, + body, + content_type, + timeout, + connection_timeout, + delivery_mode, + expiration, + **kwargs ), self._loop, ) def subscribe( self, - event: IntegrationEvent, - handler: AsyncSubscriberHandler, + exchange_name: str, routing_key: str, + handler: Callable[[Any], Awaitable[None]], response_timeout: Optional[int] = None, - connection_timeout: int = 16 + connection_timeout: int = 16, ) -> Future: if self._thread.ident == current_thread().ident: raise BlockingException( @@ -143,39 +149,45 @@ def subscribe( ) return run_coroutine_threadsafe( self._async_eventbus.subscribe( - event, handler, routing_key, response_timeout, connection_timeout + exchange_name, + routing_key, + handler, + response_timeout, + connection_timeout, ), self._loop, ) async def async_subscribe( self, - event: IntegrationEvent, - handler: AsyncSubscriberHandler, + handler: Callable[[Any], Awaitable[None]], routing_key: str, response_timeout: Optional[int] = None, - connection_timeout: int = 16 + connection_timeout: int = 16, ): if self._thread.ident != current_thread().ident: raise ThreadUnsafeException( "Cannot run async call on this thread, try to use sync thread safe methods" ) await self._async_eventbus.subscribe( - event, handler, routing_key, response_timeout, connection_timeout + handler, routing_key, response_timeout, connection_timeout ) def provide_resource( - self, name: str, + self, + name: str, callback: Callable[[List[Any]], Awaitable[Union[bytes, str]]], response_timeout: Optional[int] = None, - connection_timeout: int = 16 + connection_timeout: int = 16, ) -> Future: if self._thread.ident == current_thread().ident: raise BlockingException( "Cannot run sync blocking call on async thread, try to use async methods with an await expression" ) return run_coroutine_threadsafe( - self._async_eventbus.provide_resource(name, callback, response_timeout, connection_timeout), + self._async_eventbus.provide_resource( + name, callback, response_timeout, connection_timeout + ), self._loop, ) @@ -184,13 +196,15 @@ async def async_provide_resource( name: str, callback: Callable[[List[Any]], Awaitable[Union[bytes, str]]], response_timeout: Optional[int] = None, - connection_timeout: int = 16 + connection_timeout: int = 16, ): if self._thread.ident != current_thread().ident: raise ThreadUnsafeException( "Cannot run async call on this thread, try to use sync thread safe methods" ) - await self._async_eventbus.provide_resource(name, callback, response_timeout, connection_timeout) + await self._async_eventbus.provide_resource( + name, callback, response_timeout, connection_timeout + ) def dispose(self): run_coroutine_threadsafe(self._async_eventbus.dispose(True), self._loop) From d8d0f23e66e91d4a5fbf195f28df6f8bb974837e Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 18:34:13 -0300 Subject: [PATCH 11/20] fix: update async_subscribe method of EventbusWrapper to new interface --- amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py b/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py index 360d752..425d5e9 100644 --- a/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py +++ b/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py @@ -160,8 +160,9 @@ def subscribe( async def async_subscribe( self, - handler: Callable[[Any], Awaitable[None]], + exchange_name: str, routing_key: str, + handler: Callable[[Any], Awaitable[None]], response_timeout: Optional[int] = None, connection_timeout: int = 16, ): @@ -170,7 +171,7 @@ async def async_subscribe( "Cannot run async call on this thread, try to use sync thread safe methods" ) await self._async_eventbus.subscribe( - handler, routing_key, response_timeout, connection_timeout + exchange_name, routing_key, handler, response_timeout, connection_timeout ) def provide_resource( From 345d1ea06a58ab38a220622b4eff50a30f6b29bb Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 18:34:46 -0300 Subject: [PATCH 12/20] test: update test to new interface --- .../eventbus/test_async_eventbus_subscribe.py | 30 +++++++------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/tests/unit/eventbus/test_async_eventbus_subscribe.py b/tests/unit/eventbus/test_async_eventbus_subscribe.py index 5c6524f..1a01140 100644 --- a/tests/unit/eventbus/test_async_eventbus_subscribe.py +++ b/tests/unit/eventbus/test_async_eventbus_subscribe.py @@ -1,5 +1,4 @@ from amqp_client_python import AsyncEventbusRabbitMQ -from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler from asyncio import iscoroutinefunction from tests.unit.eventbus.default import async_add_callback import pytest @@ -7,8 +6,7 @@ @pytest.mark.asyncio_cooperative async def test_async_eventbus_subscribe_deep(async_connection_mock, config_mock): - event_name, exchange, routing_key, response_time = ( - "example_event", + exchange, routing_key, response_time = ( "ex_example", "rk_example", None, @@ -18,32 +16,24 @@ async def test_async_eventbus_subscribe_deep(async_connection_mock, config_mock) eventbus._sub_connection = async_connection_mock eventbus._sub_connection.add_callback = async_add_callback # create event - event = IntegrationEvent(event_name, exchange) # create event handler - class CreateUserEventHandler(IntegrationEventHandler): - # inject dependencies - # def __init__(self, repository) -> None: - # self.repository = repository - - async def handle(self, *body): - # validate input here!! - # can use pydantic model -> user = User(*body) - # do stuff here: - # self.repository.create_user(user) - pass - - event_handler = CreateUserEventHandler() + async def handle(body) -> None: + # validate input here!! + # can use pydantic model -> user = User(*body) + # do stuff here: + # self.repository.create_user(user) + pass assert iscoroutinefunction(eventbus.subscribe) - assert await eventbus.subscribe(event, event_handler, routing_key) is None + assert await eventbus.subscribe(exchange, routing_key, handle) is None # test connection will be open eventbus._sub_connection.open.assert_called_once_with(config.url) # test connection.subscribe will be called with right fields eventbus._sub_connection.subscribe.assert_called_once_with( config.options.queue_name, - event.event_type, + exchange, routing_key, - event_handler.handle, + handle, response_time, ) From 748304b51f439c9b5a06da9e81846dc310ae8155 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 18:36:35 -0300 Subject: [PATCH 13/20] test: update test to new interface --- .../eventbus/test_async_eventbus_publish.py | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/tests/unit/eventbus/test_async_eventbus_publish.py b/tests/unit/eventbus/test_async_eventbus_publish.py index f9330c8..6af2676 100644 --- a/tests/unit/eventbus/test_async_eventbus_publish.py +++ b/tests/unit/eventbus/test_async_eventbus_publish.py @@ -1,5 +1,4 @@ from amqp_client_python import AsyncEventbusRabbitMQ, DeliveryMode -from amqp_client_python.event import IntegrationEvent from asyncio import iscoroutinefunction from tests.unit.eventbus.default import async_add_callback import pytest @@ -8,20 +7,23 @@ @pytest.mark.asyncio_cooperative async def test_async_eventbus_publish_surface(async_connection_mock, config_mock): - event_name, exchange, routing_key, body, connection_timeout = ( - "example_event", + exchange, routing_key, body, connection_timeout = ( "ex_example", "rk_example", ["content"], - randint(1, 16) + randint(1, 16), ) eventbus = AsyncEventbusRabbitMQ(config_mock) eventbus._pub_connection = async_connection_mock # create event - event = IntegrationEvent(event_name, exchange) assert iscoroutinefunction(eventbus.publish) - assert await eventbus.publish(event, routing_key, body, connection_timeout=connection_timeout) is not None + assert ( + await eventbus.publish( + exchange, routing_key, body, connection_timeout=connection_timeout + ) + is not None + ) # test connection will be open eventbus._pub_connection.open.assert_called_once_with(config_mock.build().url) # test if will try when connection and channel is open @@ -34,14 +36,12 @@ async def test_async_eventbus_publish_surface(async_connection_mock, config_mock @pytest.mark.asyncio_cooperative async def test_async_eventbus_publish_deep(async_connection_mock, config_mock): ( - event_name, exchange, routing_key, body, content_type, timeout, ) = ( - "example_event", "ex_example", "rk_example", ["content"], @@ -52,12 +52,11 @@ async def test_async_eventbus_publish_deep(async_connection_mock, config_mock): eventbus._pub_connection = async_connection_mock eventbus._pub_connection.add_callback = async_add_callback # create event - event = IntegrationEvent(event_name, exchange) assert iscoroutinefunction(eventbus.publish) assert ( await eventbus.publish( - event, + exchange, routing_key, body, content_type, @@ -69,7 +68,7 @@ async def test_async_eventbus_publish_deep(async_connection_mock, config_mock): eventbus._pub_connection.open.assert_called_once_with(config_mock.build().url) # test if will try when connection and channel is open eventbus._pub_connection.publish.assert_called_once_with( - event.event_type, + exchange, routing_key, body, content_type, From 175d3ff20482981cf1730ef00c760e265bb8fb1a Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 18:37:22 -0300 Subject: [PATCH 14/20] test: update test to new interface --- tests/unit/channels/test_publish.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/channels/test_publish.py b/tests/unit/channels/test_publish.py index 8cae674..a518bdb 100644 --- a/tests/unit/channels/test_publish.py +++ b/tests/unit/channels/test_publish.py @@ -31,7 +31,7 @@ async def test_channel_publish(connection_mock, channel_mock, channel_factory_mo assert channel_mock.basic_publish.call_args.args == ( exchange, routing_key, - dumps({"handle": body}), + dumps(body), ) assert result is None From 6038f67ffdf1add57253b03a3d85fb716987b786 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 18:38:16 -0300 Subject: [PATCH 15/20] test: update test to new interface --- tests/unit/channels/test_async_rpc_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/channels/test_async_rpc_client.py b/tests/unit/channels/test_async_rpc_client.py index 2f8f50a..cd6bb99 100644 --- a/tests/unit/channels/test_async_rpc_client.py +++ b/tests/unit/channels/test_async_rpc_client.py @@ -43,7 +43,7 @@ async def test_rpc_client(connection_mock, channel_mock, channel_factory_mock): assert channel_mock.basic_publish.call_args.args == ( exchange, routing_key, - dumps({"resource_name": routing_key, "handle": body}), + dumps(body), ) assert result == expected_result From 90f656db2796cbc573e0ecd5e83df486d5354206 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 18:40:41 -0300 Subject: [PATCH 16/20] build: set version to 0.2.0 --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index fc28f8a..864015a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "amqp-client-python" -version = "0.1.14" +version = "0.2.0" description = "Python AMQP Client Library" license = "Apache-2.0" authors = ["NUTES UEPB "] @@ -34,7 +34,7 @@ keywords = ["packaging", "dependency", "amqp-client-python"] [project] maintainers = [ - {name = "João Pedro M. Cariry", email = "berrytern@gmail.com"} + {name = "João Pedro Miranda C. Hluchan", email = "berrytern@gmail.com"} ] [project.urls] From d8fb933e7c10e3bd7b5460614d7f1535da0e40d8 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 18:45:25 -0300 Subject: [PATCH 17/20] docs: update async example of examples/ folder --- examples/async_case.py | 56 +++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/examples/async_case.py b/examples/async_case.py index 98f3a36..1995228 100644 --- a/examples/async_case.py +++ b/examples/async_case.py @@ -1,11 +1,8 @@ -from amqp_client_python import ( - AsyncEventbusRabbitMQ, - Config, Options -) -from amqp_client_python.event import IntegrationEvent, AsyncSubscriberHandler +from amqp_client_python import AsyncEventbusRabbitMQ, Config, Options from default import queue, rpc_queue, rpc_exchange + # from uvloop import new_event_loop # better performance, no windows-OS support -from asyncio import new_event_loop # great performance, great OS compatibility +from asyncio import new_event_loop # great performance, great OS compatibility loop = new_event_loop() @@ -15,53 +12,56 @@ ) -async def handle(*body): +async def handle(body): print(f"body: {body}") - response = await eventbus.rpc_client(rpc_exchange, "user.find2", ["content_message"]) + response = await eventbus.rpc_client( + rpc_exchange, "user.find2", ["content_message"] + ) print("...") return response -async def handle2(*body): + +async def handle2(body): print(f"body: {body}") return b"here" -async def handle3(*body): + +async def handle3(body): print(body) -class ExampleEventHandler(AsyncSubscriberHandler): - event_type = rpc_exchange - async def handle(self, body) -> None: - print(body) - -class ExampleEvent(IntegrationEvent): - EVENT_NAME: str = "NAME" - def __init__(self, event_type: str, message = []) -> None: - super().__init__(self.EVENT_NAME, event_type) - self.message = message +async def subscribe_handler(body) -> None: + print(body) -publish_event = ExampleEvent(rpc_exchange, ["message"]) -event_handle = ExampleEventHandler() # rpc_client call inside rpc_provider -#if __name__ == "__main__": -eventbus = AsyncEventbusRabbitMQ(config, loop, rpc_client_publisher_confirms=True, rpc_server_publisher_confirms=False, rpc_server_auto_ack=False) +# if __name__ == "__main__": +eventbus = AsyncEventbusRabbitMQ( + config, + loop, + rpc_client_publisher_confirms=True, + rpc_server_publisher_confirms=False, + rpc_server_auto_ack=False, +) + async def run(): await eventbus.provide_resource("user.find", handle) await eventbus.provide_resource("user.find2", handle2) - await eventbus.subscribe(publish_event, event_handle,"user.find3") + await eventbus.subscribe(rpc_exchange, "user.find3", subscribe_handler) count = 0 running = True while running: try: count += 1 - result = await eventbus.rpc_client(rpc_exchange, "user.find", ["content_message"]) + result = await eventbus.rpc_client( + rpc_exchange, "user.find", ["content_message"] + ) print("returned:", result) - await eventbus.publish(publish_event, "user.find3", ["content_message"]) + await eventbus.publish(rpc_exchange, "user.find3", ["content_message"]) except BaseException as err: print(f"err: {err}") loop.create_task(run()) -loop.run_forever() \ No newline at end of file +loop.run_forever() From d251e3ce95e1edcd6ee11c7860a724efb5cd7a45 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 18:49:49 -0300 Subject: [PATCH 18/20] docs: update sync wrapper example of examples/ folder --- README.md | 6 +++--- examples/async_case.py | 4 ++-- examples/sync_wrapper_case.py | 27 +++++++-------------------- 3 files changed, 12 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index f22551c..2c318db 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ config = Config(Options("queue", "rpc_queue", "rpc_exchange")) eventbus = AsyncEventbusRabbitMQ(config) # publish -eventbus.publish("rpc_exchange", "routing.key", "direct") +eventbus.publish("rpc_exchange", "routing.key", "message_content") # subscribe async def subscribe_handler(body) -> None: print(body, type(body), flush=True) # handle messages @@ -130,7 +130,7 @@ while running: count += 1 if str(count) != eventbus.rpc_client(rpc_exchange, rpc_routing_key+"2", [f"{count}"]).decode("utf-8"): running = False - #eventbus.publish(publish_event, rpc_routing_key, "direct") + #eventbus.publish(publish_event, rpc_routing_key, "message_content") #running = False except TimeoutError as err: print("timeout!!!: ", str(err)) @@ -168,7 +168,7 @@ while running: # rpc_client call eventbus.rpc_client("rpc_exchange", "user.find", count).result().decode("utf-8") # publish - eventbus.publish("rpc_exchange", "routing.key", "direct").result() + eventbus.publish("rpc_exchange", "routing.key", "message_content").result() #running = False except KeyboardInterrupt: running=False diff --git a/examples/async_case.py b/examples/async_case.py index 1995228..eaabcd5 100644 --- a/examples/async_case.py +++ b/examples/async_case.py @@ -55,10 +55,10 @@ async def run(): try: count += 1 result = await eventbus.rpc_client( - rpc_exchange, "user.find", ["content_message"] + rpc_exchange, "user.find", ["message_content"] ) print("returned:", result) - await eventbus.publish(rpc_exchange, "user.find3", ["content_message"]) + await eventbus.publish(rpc_exchange, "user.find3", ["message_content"]) except BaseException as err: print(f"err: {err}") diff --git a/examples/sync_wrapper_case.py b/examples/sync_wrapper_case.py index 5f5964a..3954159 100644 --- a/examples/sync_wrapper_case.py +++ b/examples/sync_wrapper_case.py @@ -1,5 +1,4 @@ -from amqp_client_python import EventbusWrapperRabbitMQ, Config, Options, SSLOptions -from amqp_client_python.event import IntegrationEvent, AsyncSubscriberHandler +from amqp_client_python import EventbusWrapperRabbitMQ, Config, Options # , SSLOptions from default import queue, rpc_queue, rpc_exchange, rpc_routing_key @@ -10,17 +9,8 @@ eventbus = EventbusWrapperRabbitMQ(config=config) -class ExampleEvent(IntegrationEvent): - EVENT_NAME: str = "ExampleEvent" - - def __init__(self, event_type: str, message=[]) -> None: - super().__init__(self.EVENT_NAME, event_type) - self.message = message - - -class ExampleEventHandler(AsyncSubscriberHandler): - async def handle(self, body) -> None: - print(body, "subscribe") +async def subscribe_handler(body) -> None: + print(body, "subscribe") async def handle(*body): @@ -37,10 +27,7 @@ async def handle2(*body): return f"{body[0]}".encode("utf-8") -subscribe_event = ExampleEvent(rpc_exchange) -publish_event = ExampleEvent(rpc_exchange, ["message"]) -subscribe_event_handle = ExampleEventHandler() -eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key).result() +eventbus.subscribe(rpc_exchange, rpc_routing_key, subscribe_handler).result() eventbus.provide_resource(rpc_routing_key + "2", handle).result() eventbus.provide_resource(rpc_routing_key + "3", handle2).result() count = 0 @@ -49,10 +36,10 @@ async def handle2(*body): try: count += 1 if str(count) != eventbus.rpc_client( - rpc_exchange, rpc_routing_key + "2", [f"{count}"] + rpc_exchange, rpc_routing_key + "2", count ).result().decode("utf-8"): running = False - eventbus.publish(publish_event, rpc_routing_key, "direct").result() + eventbus.publish(rpc_exchange, rpc_routing_key, "message_content").result() # running = False except KeyboardInterrupt: running = False @@ -60,4 +47,4 @@ async def handle2(*body): running = False print("Err:", err) -eventbus.dispose() \ No newline at end of file +eventbus.dispose() From 98859fa41d4423e77ae1e38ff6856b55c26c18e7 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 19:06:21 -0300 Subject: [PATCH 19/20] chore: add preliminary compatibility table --- README.md | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 2c318db..08fa3b4 100644 --- a/README.md +++ b/README.md @@ -31,19 +31,11 @@ Client with high level of abstraction for manipulation of messages in the event - Support for a Remote procedure call _(RPC)_. -[//]: # (These are reference links used in the body of this note.) -[license-image]: https://img.shields.io/badge/license-Apache%202-blue.svg -[license-url]: https://github.com/nutes-uepb/amqp-client-python/blob/master/LICENSE -[npm-image]: https://img.shields.io/npm/v/amqp-client-python.svg?color=red&logo=npm -[npm-url]: https://npmjs.org/package/amqp-client-python -[downloads-image]: https://img.shields.io/npm/dt/amqp-client-python.svg?logo=npm -[travis-url]: https://travis-ci.org/nutes-uepb/amqp-client-python -[coverage-image]: https://coveralls.io/repos/github/nutes-uepb/amqp-client-python/badge.svg -[coverage-url]: https://coveralls.io/github/nutes-uepb/amqp-client-python?branch=master -[known-vulnerabilities-image]: https://snyk.io/test/github/nutes-uepb/amqp-client-python/badge.svg?targetFile=requirements.txt -[known-vulnerabilities-url]: https://snyk.io/test/github/nutes-uepb/amqp-client-python?targetFile=requirements.txt -[releases-image]: https://img.shields.io/github/release-date/nutes-uepb/amqp-client-python.svg -[releases-url]: https://github.com/nutes-uepb/amqp-client-python/releases +### Table of Compatibility +| version | compatible with | +| ---- | ---- | +| 0.2.0 | 0.2.0 | +| 0.1.14 | ~0.1.12 | ### Examples: #### you can use [sync](https://github.com/nutes-uepb/amqp-client-python/blob/develop/amqp_client_python/rabbitmq/eventbus_rabbitmq.py) , [async eventbus](https://github.com/nutes-uepb/amqp-client-python/blob/develop/amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py) and [sync wrapper](https://github.com/nutes-uepb/amqp-client-python/blob/develop/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py) of async eventbus @@ -192,3 +184,17 @@ When using [**EventbusRabbitMQ**](https://github.com/nutes-uepb/amqp-client-pyth + +[//]: # (These are reference links used in the body of this note.) +[license-image]: https://img.shields.io/badge/license-Apache%202-blue.svg +[license-url]: https://github.com/nutes-uepb/amqp-client-python/blob/master/LICENSE +[npm-image]: https://img.shields.io/npm/v/amqp-client-python.svg?color=red&logo=npm +[npm-url]: https://npmjs.org/package/amqp-client-python +[downloads-image]: https://img.shields.io/npm/dt/amqp-client-python.svg?logo=npm +[travis-url]: https://travis-ci.org/nutes-uepb/amqp-client-python +[coverage-image]: https://coveralls.io/repos/github/nutes-uepb/amqp-client-python/badge.svg +[coverage-url]: https://coveralls.io/github/nutes-uepb/amqp-client-python?branch=master +[known-vulnerabilities-image]: https://snyk.io/test/github/nutes-uepb/amqp-client-python/badge.svg?targetFile=requirements.txt +[known-vulnerabilities-url]: https://snyk.io/test/github/nutes-uepb/amqp-client-python?targetFile=requirements.txt +[releases-image]: https://img.shields.io/github/release-date/nutes-uepb/amqp-client-python.svg +[releases-url]: https://github.com/nutes-uepb/amqp-client-python/releases \ No newline at end of file From ec34c6148006897d0c413d07912f1117c21007a7 Mon Sep 17 00:00:00 2001 From: berrytern Date: Wed, 2 Oct 2024 19:40:50 -0300 Subject: [PATCH 20/20] build: add PYPI publish pipeline --- .github/workflows/publish.yml | 24 ++++++++++++++++++++++++ README.md | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/publish.yml diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..e512c6a --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,24 @@ +name: Publish to PyPI + +on: + release: + types: [created] + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.x' + - name: Install Poetry + uses: snok/install-poetry@v1 + - name: Build and publish + env: + PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }} + run: | + poetry config pypi-token.pypi $PYPI_TOKEN + poetry build + poetry publish \ No newline at end of file diff --git a/README.md b/README.md index 08fa3b4..390cf71 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ Client with high level of abstraction for manipulation of messages in the event ### Table of Compatibility | version | compatible with | | ---- | ---- | -| 0.2.0 | 0.2.0 | +| 0.2.0 | 0.2.0 | | 0.1.14 | ~0.1.12 | ### Examples: