Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Application-controlled acknowledgements support #346

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added

- Implement `len(client.messages)` to return number of messages in queue (@empicano in #323)
- Add support for application-controlled acknowledgements (@ale-rinaldi in #346)

### Changed

Expand Down
26 changes: 26 additions & 0 deletions aiomqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ class Client:
socket_options: Options to pass to the underlying socket.
websocket_path: The path to use for websockets.
websocket_headers: The headers to use for websockets.
manual_ack: If set to ``True``, the client will not automatically acknowledge
messages. Instead, the user must manually acknowledge messages using the
``ack`` method.
"""

def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915
Expand Down Expand Up @@ -236,6 +239,7 @@ def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915
socket_options: Iterable[SocketOption] | None = None,
websocket_path: str | None = None,
websocket_headers: WebSocketHeaders | None = None,
manual_ack: bool = False,
) -> None:
self._hostname = hostname
self._port = port
Expand Down Expand Up @@ -285,6 +289,7 @@ def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915
clean_session=clean_session,
transport=transport,
reconnect_on_failure=False,
manual_ack=manual_ack,
)
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
Expand Down Expand Up @@ -410,6 +415,27 @@ async def subscribe( # noqa: PLR0913
# Wait for callback_result
return await self._wait_for(callback_result, timeout=timeout)

@_outgoing_call
async def ack(
self,
/,
message: Message,
*args: Any,
**kwargs: Any,
) -> None:
"""Sends an acknowledgement for a given message. Only useful in QoS>=1 and manual_ack=True (option of Client)

Args:
message: The message to acknowledge.
*args: Additional positional arguments to pass to paho-mqtt's ack
method.
**kwargs: Additional keyword arguments to pass to paho-mqtt's ack
method.
"""
result = self._client.ack(message.mid, message.qos, *args, **kwargs)
if result != mqtt.MQTT_ERR_SUCCESS:
raise MqttCodeError(result, "Could not ack message")

@_outgoing_call
async def unsubscribe(
self,
Expand Down