Skip to content

Commit

Permalink
Feat: init ping method (#1592)
Browse files Browse the repository at this point in the history
* Feat: init ping method, add rabbit test ping

* Fix: add timeout test, change logic ping

* Feat: add check redis connection

* lint code, add ping for nats and kafka

* Feat: add confluent ping

* Fix: clear code

* Fix: lint

* Fix: run lint.sh

* refactor: unify ping method code

* refactor: correct FastKafkaProducer signature

* refactor: unify Confluent ping method code

* Add ping method to async confluent producer and fix confluent ping method

* refactor: use Confluent producer to ping

* lint: explicit kwargs type

* refactor: remove concrete topic asking

---------

Co-authored-by: Daniil Dumchenko <[email protected]>
Co-authored-by: Nikita Pastukhov <[email protected]>
Co-authored-by: Pastukhov Nikita <[email protected]>
Co-authored-by: Kumaran Rajendhiran <[email protected]>
  • Loading branch information
5 people authored Jul 17, 2024
1 parent 5e37604 commit 0a13918
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 38 deletions.
5 changes: 5 additions & 0 deletions faststream/broker/core/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,8 @@ async def publish(
publish = partial(m(None).publish_scope, publish)

return await publish(msg, **kwargs)

@abstractmethod
async def ping(self, timeout: Optional[float]) -> bool:
"""Check connection alive."""
raise NotImplementedError()
12 changes: 12 additions & 0 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
Union,
)

from anyio import move_on_after
from typing_extensions import Annotated, Doc, override

from faststream.__about__ import SERVICE_NAME
Expand Down Expand Up @@ -523,3 +524,14 @@ async def publish_batch(
reply_to=reply_to,
correlation_id=correlation_id,
)

@override
async def ping(self, timeout: Optional[float]) -> bool:
with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False

if self._producer is None:
return False

return await self._producer._producer.ping(timeout=timeout)
48 changes: 33 additions & 15 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@

_missing = object()

ADMINCLIENT_CONFIG_PARAMS = (
"allow.auto.create.topics",
"bootstrap.servers",
"client.id",
"request.timeout.ms",
"metadata.max.age.ms",
"security.protocol",
"connections.max.idle.ms",
"sasl.mechanism",
"sasl.username",
"sasl.password",
)


class MsgToSend(BaseModel):
"""A Pydantic model representing a message to be sent to Kafka.
Expand Down Expand Up @@ -214,6 +227,25 @@ async def send_batch(
]
await asyncio.gather(*tasks)

async def ping(
self,
timeout: Optional[float] = 5.0,
) -> bool:
"""Implement ping using `list_topics` information request."""
if timeout is None:
timeout = -1

try:
cluster_metadata = await call_or_await(
self.producer.list_topics,
timeout=timeout,
)

return bool(cluster_metadata)

except Exception:
return False


class TopicPartition(NamedTuple):
"""A named tuple representing a Kafka topic and partition."""
Expand All @@ -228,22 +260,8 @@ def create_topics(
logger: Union["LoggerProto", None, object] = logger,
) -> None:
"""Creates Kafka topics using the provided configuration."""
required_config_params = (
"allow.auto.create.topics",
"bootstrap.servers",
"client.id",
"request.timeout.ms",
"metadata.max.age.ms",
"security.protocol",
"connections.max.idle.ms",
"sasl.mechanism",
"sasl.username",
"sasl.password",
"sasl.kerberos.service.name",
)

admin_client = AdminClient(
{x: config[x] for x in required_config_params if x in config}
{x: config[x] for x in ADMINCLIENT_CONFIG_PARAMS if x in config}
)

fs = admin_client.create_topics(
Expand Down
10 changes: 1 addition & 9 deletions faststream/confluent/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from faststream.broker.message import encode_message
from faststream.broker.publisher.proto import ProducerProto
from faststream.exceptions import NOT_CONNECTED_YET

if TYPE_CHECKING:
from faststream.confluent.client import AsyncConfluentProducer
Expand All @@ -14,8 +13,6 @@
class AsyncConfluentFastProducer(ProducerProto):
"""A class to represent Kafka producer."""

_producer: Optional["AsyncConfluentProducer"]

def __init__(
self,
producer: "AsyncConfluentProducer",
Expand All @@ -36,8 +33,6 @@ async def publish( # type: ignore[override]
reply_to: str = "",
) -> None:
"""Publish a message to a topic."""
assert self._producer, NOT_CONNECTED_YET # nosec B101

message, content_type = encode_message(message)

headers_to_send = {
Expand All @@ -62,8 +57,7 @@ async def publish( # type: ignore[override]
)

async def stop(self) -> None:
if self._producer is not None: # pragma: no branch
await self._producer.stop()
await self._producer.stop()

async def publish_batch(
self,
Expand All @@ -76,8 +70,6 @@ async def publish_batch(
correlation_id: str = "",
) -> None:
"""Publish a batch of messages to a topic."""
assert self._producer, NOT_CONNECTED_YET # nosec B101

batch = self._producer.create_batch()

headers_to_send = {"correlation_id": correlation_id, **(headers or {})}
Expand Down
12 changes: 12 additions & 0 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import aiokafka
from aiokafka.partitioner import DefaultPartitioner
from aiokafka.producer.producer import _missing
from anyio import move_on_after
from typing_extensions import Annotated, Doc, override

from faststream.__about__ import SERVICE_NAME
Expand Down Expand Up @@ -803,3 +804,14 @@ async def publish_batch(
reply_to=reply_to,
correlation_id=correlation_id,
)

@override
async def ping(self, timeout: Optional[float]) -> bool:
with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False

if self._producer is None:
return False

return not self._producer._producer._closed
10 changes: 1 addition & 9 deletions faststream/kafka/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from faststream.broker.message import encode_message
from faststream.broker.publisher.proto import ProducerProto
from faststream.exceptions import NOT_CONNECTED_YET

if TYPE_CHECKING:
from aiokafka import AIOKafkaProducer
Expand All @@ -15,8 +14,6 @@
class AioKafkaFastProducer(ProducerProto):
"""A class to represent Kafka producer."""

_producer: Optional["AIOKafkaProducer"]

def __init__(
self,
producer: "AIOKafkaProducer",
Expand All @@ -37,8 +34,6 @@ async def publish( # type: ignore[override]
reply_to: str = "",
) -> None:
"""Publish a message to a topic."""
assert self._producer, NOT_CONNECTED_YET # nosec B101

message, content_type = encode_message(message)

headers_to_send = {
Expand All @@ -63,8 +58,7 @@ async def publish( # type: ignore[override]
)

async def stop(self) -> None:
if self._producer is not None: # pragma: no branch
await self._producer.stop()
await self._producer.stop()

async def publish_batch(
self,
Expand All @@ -77,8 +71,6 @@ async def publish_batch(
reply_to: str = "",
) -> None:
"""Publish a batch of messages to a topic."""
assert self._producer, NOT_CONNECTED_YET # nosec B101

batch = self._producer.create_batch()

headers_to_send = {"correlation_id": correlation_id, **(headers or {})}
Expand Down
12 changes: 12 additions & 0 deletions faststream/nats/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)

import nats
from anyio import move_on_after
from nats.aio.client import (
DEFAULT_CONNECT_TIMEOUT,
DEFAULT_DRAIN_TIMEOUT,
Expand Down Expand Up @@ -914,3 +915,14 @@ async def new_inbox(self) -> str:
assert self._connection # nosec B101

return self._connection.new_inbox()

@override
async def ping(self, timeout: Optional[float]) -> bool:
with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False

if self._connection is None:
return False

return self._connection.is_connected
12 changes: 12 additions & 0 deletions faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from urllib.parse import urlparse

from aio_pika import connect_robust
from anyio import move_on_after
from typing_extensions import Annotated, Doc, override

from faststream.__about__ import SERVICE_NAME
Expand Down Expand Up @@ -655,3 +656,14 @@ async def declare_exchange(
"""Declares exchange object in **RabbitMQ**."""
assert self.declarer, NOT_CONNECTED_YET # nosec B101
return await self.declarer.declare_exchange(exchange)

@override
async def ping(self, timeout: Optional[float]) -> bool:
with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False

if self._connection is None:
return False

return not self._connection.is_closed
12 changes: 12 additions & 0 deletions faststream/redis/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from urllib.parse import urlparse

from anyio import move_on_after
from redis.asyncio.client import Redis
from redis.asyncio.connection import (
Connection,
Expand Down Expand Up @@ -477,3 +478,14 @@ async def publish_batch(
list=list,
correlation_id=correlation_id,
)

@override
async def ping(self, timeout: Optional[float]) -> bool:
with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False

if self._connection is None:
return False

return await self._connection.ping()
11 changes: 10 additions & 1 deletion tests/brokers/base/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ class BrokerConnectionTestcase:
def get_broker_args(self, settings):
return {}

@pytest.mark.asyncio()
async def ping(self, broker) -> bool:
return True
return await broker.ping(timeout=5.0)

@pytest.mark.asyncio()
async def test_close_before_start(self, async_mock):
Expand Down Expand Up @@ -46,3 +47,11 @@ async def test_connect_by_url_priority(self, settings):
await broker.connect(**kwargs)
assert await self.ping(broker)
await broker.close()

@pytest.mark.asyncio()
async def test_ping_timeout(self, settings):
kwargs = self.get_broker_args(settings)
broker = self.broker("wrong_url")
await broker.connect(**kwargs)
assert not await broker.ping(timeout=0.00001)
await broker.close()
4 changes: 0 additions & 4 deletions tests/brokers/redis/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ def get_broker_args(self, settings):
"port": settings.port,
}

async def ping(self, broker: RedisBroker) -> bool:
await broker._connection.ping()
return True

@pytest.mark.asyncio()
async def test_init_connect_by_raw_data(self, settings):
async with RedisBroker(
Expand Down

0 comments on commit 0a13918

Please sign in to comment.