diff --git a/CHANGELOG.md b/CHANGELOG.md index 93ffa64..bfa191e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Implement `len(client.messages)` to return number of messages in queue (@empicano in #323) +- Add support for application-controlled acknowledgements (@ale-rinaldi in #346) ### Changed diff --git a/aiomqtt/client.py b/aiomqtt/client.py index a19cb91..7030f84 100644 --- a/aiomqtt/client.py +++ b/aiomqtt/client.py @@ -203,6 +203,9 @@ class Client: socket_options: Options to pass to the underlying socket. websocket_path: The path to use for websockets. websocket_headers: The headers to use for websockets. + manual_ack: If set to ``True``, the client will not automatically acknowledge + messages. Instead, the user must manually acknowledge messages using the + ``ack`` method. """ def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915 @@ -236,6 +239,7 @@ def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915 socket_options: Iterable[SocketOption] | None = None, websocket_path: str | None = None, websocket_headers: WebSocketHeaders | None = None, + manual_ack: bool = False, ) -> None: self._hostname = hostname self._port = port @@ -285,6 +289,7 @@ def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915 clean_session=clean_session, transport=transport, reconnect_on_failure=False, + manual_ack=manual_ack, ) self._client.on_connect = self._on_connect self._client.on_disconnect = self._on_disconnect @@ -410,6 +415,27 @@ async def subscribe( # noqa: PLR0913 # Wait for callback_result return await self._wait_for(callback_result, timeout=timeout) + @_outgoing_call + async def ack( + self, + /, + message: Message, + *args: Any, + **kwargs: Any, + ) -> None: + """Sends an acknowledgement for a given message. Only useful in QoS>=1 and manual_ack=True (option of Client) + + Args: + message: The message to acknowledge. + *args: Additional positional arguments to pass to paho-mqtt's ack + method. + **kwargs: Additional keyword arguments to pass to paho-mqtt's ack + method. + """ + result = self._client.ack(message.mid, message.qos, *args, **kwargs) + if result != mqtt.MQTT_ERR_SUCCESS: + raise MqttCodeError(result, "Could not ack message") + @_outgoing_call async def unsubscribe( self,