From 0a1391891b31988ff4e0d70b628c0a14b0e0b1d6 Mon Sep 17 00:00:00 2001 From: Flosckow <66554425+Flosckow@users.noreply.github.com> Date: Wed, 17 Jul 2024 23:58:37 +0700 Subject: [PATCH] Feat: init ping method (#1592) * Feat: init ping method, add rabbit test ping * Fix: add timeout test, change logic ping * Feat: add check redis connection * lint code, add ping for nats and kafka * Feat: add confluent ping * Fix: clear code * Fix: lint * Fix: run lint.sh * refactor: unify ping method code * refactor: correct FastKafkaProducer signature * refactor: unify Confluent ping method code * Add ping method to async confluent producer and fix confluent ping method * refactor: use Confluent producer to ping * lint: explicit kwargs type * refactor: remove concrete topic asking --------- Co-authored-by: Daniil Dumchenko Co-authored-by: Nikita Pastukhov Co-authored-by: Pastukhov Nikita Co-authored-by: Kumaran Rajendhiran --- faststream/broker/core/usecase.py | 5 +++ faststream/confluent/broker/broker.py | 12 ++++++ faststream/confluent/client.py | 48 +++++++++++++++------- faststream/confluent/publisher/producer.py | 10 +---- faststream/kafka/broker/broker.py | 12 ++++++ faststream/kafka/publisher/producer.py | 10 +---- faststream/nats/broker/broker.py | 12 ++++++ faststream/rabbit/broker/broker.py | 12 ++++++ faststream/redis/broker/broker.py | 12 ++++++ tests/brokers/base/connection.py | 11 ++++- tests/brokers/redis/test_connect.py | 4 -- 11 files changed, 110 insertions(+), 38 deletions(-) diff --git a/faststream/broker/core/usecase.py b/faststream/broker/core/usecase.py index c226850ace..d8aae6064c 100644 --- a/faststream/broker/core/usecase.py +++ b/faststream/broker/core/usecase.py @@ -342,3 +342,8 @@ async def publish( publish = partial(m(None).publish_scope, publish) return await publish(msg, **kwargs) + + @abstractmethod + async def ping(self, timeout: Optional[float]) -> bool: + """Check connection alive.""" + raise NotImplementedError() diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 45c3f8fc1a..8d83719740 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -17,6 +17,7 @@ Union, ) +from anyio import move_on_after from typing_extensions import Annotated, Doc, override from faststream.__about__ import SERVICE_NAME @@ -523,3 +524,14 @@ async def publish_batch( reply_to=reply_to, correlation_id=correlation_id, ) + + @override + async def ping(self, timeout: Optional[float]) -> bool: + with move_on_after(timeout) as cancel_scope: + if cancel_scope.cancel_called: + return False + + if self._producer is None: + return False + + return await self._producer._producer.ping(timeout=timeout) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 74b8dddbe1..49d75b76de 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -26,6 +26,19 @@ _missing = object() +ADMINCLIENT_CONFIG_PARAMS = ( + "allow.auto.create.topics", + "bootstrap.servers", + "client.id", + "request.timeout.ms", + "metadata.max.age.ms", + "security.protocol", + "connections.max.idle.ms", + "sasl.mechanism", + "sasl.username", + "sasl.password", +) + class MsgToSend(BaseModel): """A Pydantic model representing a message to be sent to Kafka. @@ -214,6 +227,25 @@ async def send_batch( ] await asyncio.gather(*tasks) + async def ping( + self, + timeout: Optional[float] = 5.0, + ) -> bool: + """Implement ping using `list_topics` information request.""" + if timeout is None: + timeout = -1 + + try: + cluster_metadata = await call_or_await( + self.producer.list_topics, + timeout=timeout, + ) + + return bool(cluster_metadata) + + except Exception: + return False + class TopicPartition(NamedTuple): """A named tuple representing a Kafka topic and partition.""" @@ -228,22 +260,8 @@ def create_topics( logger: Union["LoggerProto", None, object] = logger, ) -> None: """Creates Kafka topics using the provided configuration.""" - required_config_params = ( - "allow.auto.create.topics", - "bootstrap.servers", - "client.id", - "request.timeout.ms", - "metadata.max.age.ms", - "security.protocol", - "connections.max.idle.ms", - "sasl.mechanism", - "sasl.username", - "sasl.password", - "sasl.kerberos.service.name", - ) - admin_client = AdminClient( - {x: config[x] for x in required_config_params if x in config} + {x: config[x] for x in ADMINCLIENT_CONFIG_PARAMS if x in config} ) fs = admin_client.create_topics( diff --git a/faststream/confluent/publisher/producer.py b/faststream/confluent/publisher/producer.py index 0989ba5644..99c75d32b7 100644 --- a/faststream/confluent/publisher/producer.py +++ b/faststream/confluent/publisher/producer.py @@ -4,7 +4,6 @@ from faststream.broker.message import encode_message from faststream.broker.publisher.proto import ProducerProto -from faststream.exceptions import NOT_CONNECTED_YET if TYPE_CHECKING: from faststream.confluent.client import AsyncConfluentProducer @@ -14,8 +13,6 @@ class AsyncConfluentFastProducer(ProducerProto): """A class to represent Kafka producer.""" - _producer: Optional["AsyncConfluentProducer"] - def __init__( self, producer: "AsyncConfluentProducer", @@ -36,8 +33,6 @@ async def publish( # type: ignore[override] reply_to: str = "", ) -> None: """Publish a message to a topic.""" - assert self._producer, NOT_CONNECTED_YET # nosec B101 - message, content_type = encode_message(message) headers_to_send = { @@ -62,8 +57,7 @@ async def publish( # type: ignore[override] ) async def stop(self) -> None: - if self._producer is not None: # pragma: no branch - await self._producer.stop() + await self._producer.stop() async def publish_batch( self, @@ -76,8 +70,6 @@ async def publish_batch( correlation_id: str = "", ) -> None: """Publish a batch of messages to a topic.""" - assert self._producer, NOT_CONNECTED_YET # nosec B101 - batch = self._producer.create_batch() headers_to_send = {"correlation_id": correlation_id, **(headers or {})} diff --git a/faststream/kafka/broker/broker.py b/faststream/kafka/broker/broker.py index 36a5a3a213..9ce9e08dfe 100644 --- a/faststream/kafka/broker/broker.py +++ b/faststream/kafka/broker/broker.py @@ -19,6 +19,7 @@ import aiokafka from aiokafka.partitioner import DefaultPartitioner from aiokafka.producer.producer import _missing +from anyio import move_on_after from typing_extensions import Annotated, Doc, override from faststream.__about__ import SERVICE_NAME @@ -803,3 +804,14 @@ async def publish_batch( reply_to=reply_to, correlation_id=correlation_id, ) + + @override + async def ping(self, timeout: Optional[float]) -> bool: + with move_on_after(timeout) as cancel_scope: + if cancel_scope.cancel_called: + return False + + if self._producer is None: + return False + + return not self._producer._producer._closed diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index 53ab290f0a..398dba7e4a 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -4,7 +4,6 @@ from faststream.broker.message import encode_message from faststream.broker.publisher.proto import ProducerProto -from faststream.exceptions import NOT_CONNECTED_YET if TYPE_CHECKING: from aiokafka import AIOKafkaProducer @@ -15,8 +14,6 @@ class AioKafkaFastProducer(ProducerProto): """A class to represent Kafka producer.""" - _producer: Optional["AIOKafkaProducer"] - def __init__( self, producer: "AIOKafkaProducer", @@ -37,8 +34,6 @@ async def publish( # type: ignore[override] reply_to: str = "", ) -> None: """Publish a message to a topic.""" - assert self._producer, NOT_CONNECTED_YET # nosec B101 - message, content_type = encode_message(message) headers_to_send = { @@ -63,8 +58,7 @@ async def publish( # type: ignore[override] ) async def stop(self) -> None: - if self._producer is not None: # pragma: no branch - await self._producer.stop() + await self._producer.stop() async def publish_batch( self, @@ -77,8 +71,6 @@ async def publish_batch( reply_to: str = "", ) -> None: """Publish a batch of messages to a topic.""" - assert self._producer, NOT_CONNECTED_YET # nosec B101 - batch = self._producer.create_batch() headers_to_send = {"correlation_id": correlation_id, **(headers or {})} diff --git a/faststream/nats/broker/broker.py b/faststream/nats/broker/broker.py index fefcc069a8..74fa1f0bdf 100644 --- a/faststream/nats/broker/broker.py +++ b/faststream/nats/broker/broker.py @@ -14,6 +14,7 @@ ) import nats +from anyio import move_on_after from nats.aio.client import ( DEFAULT_CONNECT_TIMEOUT, DEFAULT_DRAIN_TIMEOUT, @@ -914,3 +915,14 @@ async def new_inbox(self) -> str: assert self._connection # nosec B101 return self._connection.new_inbox() + + @override + async def ping(self, timeout: Optional[float]) -> bool: + with move_on_after(timeout) as cancel_scope: + if cancel_scope.cancel_called: + return False + + if self._connection is None: + return False + + return self._connection.is_connected diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index 6cb357fef7..c12ec28bae 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -13,6 +13,7 @@ from urllib.parse import urlparse from aio_pika import connect_robust +from anyio import move_on_after from typing_extensions import Annotated, Doc, override from faststream.__about__ import SERVICE_NAME @@ -655,3 +656,14 @@ async def declare_exchange( """Declares exchange object in **RabbitMQ**.""" assert self.declarer, NOT_CONNECTED_YET # nosec B101 return await self.declarer.declare_exchange(exchange) + + @override + async def ping(self, timeout: Optional[float]) -> bool: + with move_on_after(timeout) as cancel_scope: + if cancel_scope.cancel_called: + return False + + if self._connection is None: + return False + + return not self._connection.is_closed diff --git a/faststream/redis/broker/broker.py b/faststream/redis/broker/broker.py index 4f30e8adfb..aebfc1add9 100644 --- a/faststream/redis/broker/broker.py +++ b/faststream/redis/broker/broker.py @@ -13,6 +13,7 @@ ) from urllib.parse import urlparse +from anyio import move_on_after from redis.asyncio.client import Redis from redis.asyncio.connection import ( Connection, @@ -477,3 +478,14 @@ async def publish_batch( list=list, correlation_id=correlation_id, ) + + @override + async def ping(self, timeout: Optional[float]) -> bool: + with move_on_after(timeout) as cancel_scope: + if cancel_scope.cancel_called: + return False + + if self._connection is None: + return False + + return await self._connection.ping() diff --git a/tests/brokers/base/connection.py b/tests/brokers/base/connection.py index 9455048669..3307698f61 100644 --- a/tests/brokers/base/connection.py +++ b/tests/brokers/base/connection.py @@ -11,8 +11,9 @@ class BrokerConnectionTestcase: def get_broker_args(self, settings): return {} + @pytest.mark.asyncio() async def ping(self, broker) -> bool: - return True + return await broker.ping(timeout=5.0) @pytest.mark.asyncio() async def test_close_before_start(self, async_mock): @@ -46,3 +47,11 @@ async def test_connect_by_url_priority(self, settings): await broker.connect(**kwargs) assert await self.ping(broker) await broker.close() + + @pytest.mark.asyncio() + async def test_ping_timeout(self, settings): + kwargs = self.get_broker_args(settings) + broker = self.broker("wrong_url") + await broker.connect(**kwargs) + assert not await broker.ping(timeout=0.00001) + await broker.close() diff --git a/tests/brokers/redis/test_connect.py b/tests/brokers/redis/test_connect.py index 305fee821f..cd10fe8ca0 100644 --- a/tests/brokers/redis/test_connect.py +++ b/tests/brokers/redis/test_connect.py @@ -15,10 +15,6 @@ def get_broker_args(self, settings): "port": settings.port, } - async def ping(self, broker: RedisBroker) -> bool: - await broker._connection.ping() - return True - @pytest.mark.asyncio() async def test_init_connect_by_raw_data(self, settings): async with RedisBroker(