diff --git a/.doctrees/alongside-fastapi-and-co.doctree b/.doctrees/alongside-fastapi-and-co.doctree index 11829e6..6949293 100644 Binary files a/.doctrees/alongside-fastapi-and-co.doctree and b/.doctrees/alongside-fastapi-and-co.doctree differ diff --git a/.doctrees/connecting-to-the-broker.doctree b/.doctrees/connecting-to-the-broker.doctree index c3b5509..3df3cd9 100644 Binary files a/.doctrees/connecting-to-the-broker.doctree and b/.doctrees/connecting-to-the-broker.doctree differ diff --git a/.doctrees/developer-interface.doctree b/.doctrees/developer-interface.doctree index a891cb6..7d7865b 100644 Binary files a/.doctrees/developer-interface.doctree and b/.doctrees/developer-interface.doctree differ diff --git a/.doctrees/environment.pickle b/.doctrees/environment.pickle index eb8cc63..c0f7f16 100644 Binary files a/.doctrees/environment.pickle and b/.doctrees/environment.pickle differ diff --git a/.doctrees/index.doctree b/.doctrees/index.doctree index ae86684..6e8c9e1 100644 Binary files a/.doctrees/index.doctree and b/.doctrees/index.doctree differ diff --git a/.doctrees/migration-guide-v2.doctree b/.doctrees/migration-guide-v2.doctree new file mode 100644 index 0000000..92d64d8 Binary files /dev/null and b/.doctrees/migration-guide-v2.doctree differ diff --git a/.doctrees/reconnection.doctree b/.doctrees/reconnection.doctree index c448b82..9aaefa5 100644 Binary files a/.doctrees/reconnection.doctree and b/.doctrees/reconnection.doctree differ diff --git a/.doctrees/subscribing-to-a-topic.doctree b/.doctrees/subscribing-to-a-topic.doctree index efa5c44..ae1b328 100644 Binary files a/.doctrees/subscribing-to-a-topic.doctree and b/.doctrees/subscribing-to-a-topic.doctree differ diff --git a/_sources/alongside-fastapi-and-co.md.txt b/_sources/alongside-fastapi-and-co.md.txt index c390c1a..14841a9 100644 --- a/_sources/alongside-fastapi-and-co.md.txt +++ b/_sources/alongside-fastapi-and-co.md.txt @@ -12,10 +12,8 @@ import fastapi async def listen(client): - async with client.messages() as messages: - await client.subscribe("humidity/#") - async for message in messages: - print(message.payload) + async for message in client.messages: + print(message.payload) client = None @@ -28,6 +26,7 @@ async def lifespan(app): # Make client globally available client = c # Listen for MQTT messages in (unawaited) asyncio task + await client.subscribe("humidity/#") loop = asyncio.get_event_loop() task = loop.create_task(listen(client)) yield diff --git a/_sources/connecting-to-the-broker.md.txt b/_sources/connecting-to-the-broker.md.txt index c106b3a..9e2578b 100644 --- a/_sources/connecting-to-the-broker.md.txt +++ b/_sources/connecting-to-the-broker.md.txt @@ -20,7 +20,7 @@ The connection to the broker is managed by the `Client` context manager. This co Context managers make it easier to manage resources like network connections or files by ensuring that their teardown logic is always executed -- even in case of an exception. ```{tip} -If your use case does not allow you to use a context manager, you can use the client's `__aenter__` and `__aexit__` methods directly as a workaround, similar to how you would use manual `connect` and `disconnect` methods. With this approach you need to make sure that `___aexit___` is also called in case of an exception. Avoid this workaround if you can, it's a bit tricky to get right. +If your use case does not allow you to use a context manager, you can use the client's `__aenter__` and `__aexit__` methods to connect and disconnect as a workaround. With this approach you need to ensure yourself that `___aexit___` is also called in case of an exception. Avoid this workaround if you can, it's a bit tricky to get right. ``` ```{note} diff --git a/_sources/index.md.txt b/_sources/index.md.txt index 167ef6e..d43600d 100644 --- a/_sources/index.md.txt +++ b/_sources/index.md.txt @@ -27,10 +27,11 @@ alongside-fastapi-and-co ``` ```{toctree} -:caption: API reference +:caption: reference :hidden: developer-interface +migration-guide-v2 ``` ```{toctree} @@ -39,7 +40,7 @@ developer-interface GitHub Issue tracker -Discussions +Changelog Contributing PyPI ``` diff --git a/_sources/migration-guide-v2.md.txt b/_sources/migration-guide-v2.md.txt new file mode 100644 index 0000000..f31e3fa --- /dev/null +++ b/_sources/migration-guide-v2.md.txt @@ -0,0 +1,154 @@ +# Migration guide: v2.0.0 + +Version 2.0.0 introduces some breaking changes. This page aims to help you migrate to this new major version. The relevant changes are: + +- The deprecated `connect` and `disconnect` methods have been removed +- The deprecated `filtered_messages` and `unfiltered_messages` methods have been removed +- User-managed queues for incoming messages have been replaced with a single client-wide queue +- Some arguments to the `Client` have been renamed or removed + +## Changes to the client lifecycle + +The deprecated `connect` and `disconnect` methods have been removed. The best way to connect and disconnect from the broker is through the client's context manager: + +```python +import asyncio +import aiomqtt + + +async def main(): + async with aiomqtt.Client("test.mosquitto.org") as client: + await client.publish("temperature/outside", payload=28.4) + + +asyncio.run(main()) +``` + +If your use case does not allow you to use a context manager, you can use the client’s `__aenter__` and `__aexit__` methods almost interchangeably in place of the removed `connect` and `disconnect` methods. + +The `__aenter__` and `__aexit__` methods are designed to be called by the `async with` statement when the execution enters and exits the context manager. However, we can also execute them manually: + +```python +import asyncio +import aiomqtt + + +async def main(): + client = aiomqtt.Client("test.mosquitto.org") + await client.__aenter__() + try: + await client.publish("temperature/outside", payload=28.4) + finally: + await client.__aexit__(None, None, None) + + +asyncio.run(main()) +``` + +`__aenter__` is equivalent to `connect`. `__aexit__` is equivalent to `disconnect` except that it forces disconnection instead of throwing an exception in case the client cannot disconnect cleanly. + +```{note} +`__aexit__` expects three arguments: `exc_type`, `exc`, and `tb`. These arguments describe the exception that caused the context manager to exit, if any. You can pass `None` to all of these arguments in a manual call to `__aexit__`. +``` + +## Changes to the message queue + +The `filtered_messages`, `unfiltered_messages`, and `messages` methods have been removed and replaced with a single client-wide message queue. + +A minimal example of printing all messages (unfiltered) looks like this: + +```python +import asyncio +import aiomqtt + + +async def main(): + async with aiomqtt.Client("test.mosquitto.org") as client: + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) + + +asyncio.run(main()) +``` + +To handle messages from different topics differently, we can use `Topic.matches()`: + +```python +import asyncio +import aiomqtt + + +async def main(): + async with aiomqtt.Client("test.mosquitto.org") as client: + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") + async for message in client.messages: + if message.topic.matches("humidity/inside"): + print(f"[humidity/inside] {message.payload}") + if message.topic.matches("+/outside"): + print(f"[+/outside] {message.payload}") + if message.topic.matches("temperature/#"): + print(f"[temperature/#] {message.payload}") + + +asyncio.run(main()) +``` + +```{note} +In our example, messages to `temperature/outside` are handled twice! +``` + +The `filtered_messages`, `unfiltered_messages`, and `messages` methods created isolated message queues underneath, such that you could invoke them multiple times. From Version 2.0.0 on, the client maintains a single queue that holds all incoming messages, accessible via `Client.messages`. + +If you continue to need multiple queues (e.g. because you have special concurrency requirements), you can build a "distributor" on top: + +```python +import asyncio +import aiomqtt + + +async def temperature_consumer(): + while True: + message = await temperature_queue.get() + print(f"[temperature/#] {message.payload}") + + +async def humidity_consumer(): + while True: + message = await humidity_queue.get() + print(f"[humidity/#] {message.payload}") + + +temperature_queue = asyncio.Queue() +humidity_queue = asyncio.Queue() + + +async def distributor(client): + # Sort messages into the appropriate queues + async for message in client.messages: + if message.topic.matches("temperature/#"): + temperature_queue.put_nowait(message) + elif message.topic.matches("humidity/#"): + humidity_queue.put_nowait(message) + + +async def main(): + async with aiomqtt.Client("test.mosquitto.org") as client: + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") + # Use a task group to manage and await all tasks + async with asyncio.TaskGroup() as tg: + tg.create_task(distributor(client)) + tg.create_task(temperature_consumer()) + tg.create_task(humidity_consumer()) + + +asyncio.run(main()) +``` + +## Changes to client arguments + +- The `queue_class` and `queue_maxsize` arguments to `filtered_messages`, `unfiltered_messages`, and `messages` have been moved to the `Client` and have been renamed to `queue_type` and `max_queued_incoming_messages` +- The `max_queued_messages` client argument has been renamed to `max_queued_outgoing_messages` +- The deprecated `message_retry_set` client argument has been removed diff --git a/_sources/reconnection.md.txt b/_sources/reconnection.md.txt index 9ea92fb..c6b754a 100644 --- a/_sources/reconnection.md.txt +++ b/_sources/reconnection.md.txt @@ -17,10 +17,9 @@ async def main(): while True: try: async with client: - async with client.messages() as messages: - await client.subscribe("humidity/#") - async for message in messages: - print(message.payload) + await client.subscribe("humidity/#") + async for message in client.messages: + print(message.payload) except aiomqtt.MqttError: print(f"Connection lost; Reconnecting in {interval} seconds ...") await asyncio.sleep(interval) diff --git a/_sources/subscribing-to-a-topic.md.txt b/_sources/subscribing-to-a-topic.md.txt index bee45c1..74212af 100644 --- a/_sources/subscribing-to-a-topic.md.txt +++ b/_sources/subscribing-to-a-topic.md.txt @@ -1,6 +1,6 @@ # Subscribing to a topic -To receive messages for a topic, we need to subscribe to it and listen for messages. This is a minimal working example that listens for messages to the `temperature/#` wildcard: +To receive messages for a topic, we need to subscribe to it. Incoming messages are queued internally. You can use the `Client.message` generator to iterate over incoming messages. This is a minimal working example that listens for messages to the `temperature/#` wildcard: ```python import asyncio @@ -9,10 +9,9 @@ import aiomqtt async def main(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - async for message in messages: - print(message.payload) + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) asyncio.run(main()) @@ -37,16 +36,15 @@ import aiomqtt async def main(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - await client.subscribe("humidity/#") - async for message in messages: - if message.topic.matches("humidity/inside"): - print(f"[humidity/outside] {message.payload}") - if message.topic.matches("+/outside"): - print(f"[+/inside] {message.payload}") - if message.topic.matches("temperature/#"): - print(f"[temperature/#] {message.payload}") + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") + async for message in client.messages: + if message.topic.matches("humidity/inside"): + print(f"[humidity/inside] {message.payload}") + if message.topic.matches("+/outside"): + print(f"[+/outside] {message.payload}") + if message.topic.matches("temperature/#"): + print(f"[temperature/#] {message.payload}") asyncio.run(main()) @@ -62,9 +60,9 @@ For details on the `+` and `#` wildcards and what topics they match, see the [OA ## The message queue -Messages are queued and returned sequentially from `Client.messages()`. +Messages are queued internally and returned sequentially from `Client.messages`. -The default queue is `asyncio.Queue` which returns messages on a FIFO ("first in first out") basis. You can pass [other types of asyncio queues](https://docs.python.org/3/library/asyncio-queue.html) as `queue_class` to `Client.messages()` to modify the order in which messages are returned, e.g. `asyncio.LifoQueue`. +The default queue is `asyncio.Queue` which returns messages on a FIFO ("first in first out") basis. You can pass [other types of asyncio queues](https://docs.python.org/3/library/asyncio-queue.html) as `queue_class` to the `Client` to modify the order in which messages are returned, e.g. `asyncio.LifoQueue`. You can subclass `asyncio.PriorityQueue` to queue based on priority. Messages are returned ascendingly by their priority values. In the case of ties, messages with lower message identifiers are returned first. @@ -87,12 +85,13 @@ class CustomPriorityQueue(asyncio.PriorityQueue): async def main(): - async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages(queue_class=CustomPriorityQueue) as messages: - await client.subscribe("temperature/#") - await client.subscribe("humidity/#") - async for message in messages: - print(message.payload) + async with aiomqtt.Client( + "test.mosquitto.org", queue_class=CustomPriorityQueue + ) as client: + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") + async for message in client.messages: + print(message.payload) asyncio.run(main()) @@ -104,7 +103,7 @@ By default, the size of the queue is unlimited. You can set a limit by passing t ## Processing concurrently -Messages are queued and returned sequentially from `Client.messages()`. If a message takes a long time to handle, it blocks the handling of other messages. +Messages are queued internally and returned sequentially from `Client.messages`. If a message takes a long time to handle, it blocks the handling of other messages. You can handle messages concurrently by using an `asyncio.TaskGroup` like so: @@ -120,11 +119,11 @@ async def handle(message): async def main(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - async with asyncio.TaskGroup() as tg: - async for message in messages: - tg.create_task(handle(message)) # Spawn new coroutine + await client.subscribe("temperature/#") + # Use a task group to manage and await all tasks + async with asyncio.TaskGroup() as tg: + async for message in client.messages: + tg.create_task(handle(message)) # Spawn new coroutine asyncio.run(main()) @@ -134,6 +133,8 @@ asyncio.run(main()) Coroutines only make sense if your message handling is I/O-bound. If it's CPU-bound, you should spawn multiple processes instead. ``` +## Multiple queues + The code snippet above handles each message in a new coroutine. Sometimes, we want to handle messages from different topics concurrently, but sequentially inside a single topic. The idea here is to implement a "distributor" that sorts incoming messages into multiple asyncio queues. Each queue is then processed by a different coroutine. Let's see how this works for our temperature and humidity messages: @@ -160,19 +161,18 @@ humidity_queue = asyncio.Queue() async def distributor(client): - async with client.messages() as messages: - await client.subscribe("temperature/#") - await client.subscribe("humidity/#") - # Sort messages into the appropriate queues - async for message in messages: - if message.topic.matches("temperature/#"): - temperature_queue.put_nowait(message) - elif message.topic.matches("humidity/#"): - humidity_queue.put_nowait(message) + # Sort messages into the appropriate queues + async for message in client.messages: + if message.topic.matches("temperature/#"): + temperature_queue.put_nowait(message) + elif message.topic.matches("humidity/#"): + humidity_queue.put_nowait(message) async def main(): async with aiomqtt.Client("test.mosquitto.org") as client: + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") # Use a task group to manage and await all tasks async with asyncio.TaskGroup() as tg: tg.create_task(distributor(client)) @@ -208,13 +208,13 @@ async def sleep(seconds): async def listen(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - async for message in messages: - print(message.payload) + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) async def main(): + # Use a task group to manage and await all tasks async with asyncio.TaskGroup() as tg: tg.create_task(sleep(2)) tg.create_task(listen()) # Start the listener task @@ -240,10 +240,9 @@ import aiomqtt async def listen(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - async for message in messages: - print(message.payload) + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) background_tasks = set() @@ -280,10 +279,9 @@ import aiomqtt async def listen(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - async for message in messages: - print(message.payload) + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) async def main(): @@ -315,10 +313,9 @@ import aiomqtt async def listen(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - async for message in messages: - print(message.payload) + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) async def main(): diff --git a/alongside-fastapi-and-co.html b/alongside-fastapi-and-co.html index 4a2cb36..eab042a 100644 --- a/alongside-fastapi-and-co.html +++ b/alongside-fastapi-and-co.html @@ -166,15 +166,16 @@
  • Reconnection
  • Alongside FastAPI & Co.
  • -

    API reference

    +

    reference

    Project links

    @@ -222,10 +223,8 @@

    Alongside FastAPI & Co.async def listen(client): - async with client.messages() as messages: - await client.subscribe("humidity/#") - async for message in messages: - print(message.payload) + async for message in client.messages: + print(message.payload) client = None @@ -238,6 +237,7 @@

    Alongside FastAPI & Co.# Make client globally available client = c # Listen for MQTT messages in (unawaited) asyncio task + await client.subscribe("humidity/#") loop = asyncio.get_event_loop() task = loop.create_task(listen(client)) yield diff --git a/connecting-to-the-broker.html b/connecting-to-the-broker.html index 2d61041..c6cabed 100644 --- a/connecting-to-the-broker.html +++ b/connecting-to-the-broker.html @@ -166,15 +166,16 @@
  • Reconnection
  • Alongside FastAPI & Co.
  • -

    API reference

    +

    reference

    Project links

    @@ -230,7 +231,7 @@

    Connecting to the broker

    Tip

    -

    If your use case does not allow you to use a context manager, you can use the client’s __aenter__ and __aexit__ methods directly as a workaround, similar to how you would use manual connect and disconnect methods. With this approach you need to make sure that ___aexit___ is also called in case of an exception. Avoid this workaround if you can, it’s a bit tricky to get right.

    +

    If your use case does not allow you to use a context manager, you can use the client’s __aenter__ and __aexit__ methods to connect and disconnect as a workaround. With this approach you need to ensure yourself that ___aexit___ is also called in case of an exception. Avoid this workaround if you can, it’s a bit tricky to get right.

    Note

    diff --git a/developer-interface.html b/developer-interface.html index 81c5ad6..340fa1f 100644 --- a/developer-interface.html +++ b/developer-interface.html @@ -3,7 +3,7 @@ - + Developer interface - aiomqtt @@ -166,15 +166,16 @@
  • Reconnection
  • Alongside FastAPI & Co.
  • -

    API reference

    +

    reference

    Project links

    @@ -217,7 +218,7 @@

    Developer interface#

    -class aiomqtt.Client(hostname: str, port: int = 1883, *, username: str | None = None, password: str | None = None, logger: logging.Logger | None = None, client_id: str | None = None, tls_context: ssl.SSLContext | None = None, tls_params: TLSParameters | None = None, tls_insecure: bool | None = None, proxy: ProxySettings | None = None, protocol: ProtocolVersion | None = None, will: Will | None = None, clean_session: bool | None = None, transport: str = 'tcp', timeout: float | None = None, keepalive: int = 60, bind_address: str = '', bind_port: int = 0, clean_start: int = 3, properties: Properties | None = None, message_retry_set: int = 20, socket_options: Iterable[SocketOption] | None = None, max_concurrent_outgoing_calls: int | None = None, websocket_path: str | None = None, websocket_headers: WebSocketHeaders | None = None, max_inflight_messages: int | None = None, max_queued_messages: int | None = None)
    +class aiomqtt.Client(hostname: str, port: int = 1883, *, username: str | None = None, password: str | None = None, logger: logging.Logger | None = None, identifier: str | None = None, queue_type: type[asyncio.Queue[Message]] | None = None, protocol: ProtocolVersion | None = None, will: Will | None = None, clean_session: bool | None = None, transport: str = 'tcp', timeout: float | None = None, keepalive: int = 60, bind_address: str = '', bind_port: int = 0, clean_start: int = 3, max_queued_incoming_messages: int | None = None, max_queued_outgoing_messages: int | None = None, max_inflight_messages: int | None = None, max_concurrent_outgoing_calls: int | None = None, properties: mqtt.Properties | None = None, tls_context: ssl.SSLContext | None = None, tls_params: TLSParameters | None = None, tls_insecure: bool | None = None, proxy: ProxySettings | None = None, socket_options: Iterable[SocketOption] | None = None, websocket_path: str | None = None, websocket_headers: WebSocketHeaders | None = None)

    The async context manager that manages the connection to the broker.

    Parameters:
    @@ -227,12 +228,11 @@

    Client

    username – The username to authenticate with.

  • password – The password to authenticate with.

  • logger – Custom logger instance.

  • -
  • client_id – The client ID to use. If None, one will be generated -automatically.

  • -
  • tls_context – The SSL/TLS context.

  • -
  • tls_params – The SSL/TLS configuration to use.

  • -
  • tls_insecure – Enable/disable server hostname verification when using SSL/TLS.

  • -
  • proxy – Configure a proxy for the connection.

  • +
  • identifier – The client identifier. Generated automatically if None.

  • +
  • queue_type – The class to use for the queue. The default is +asyncio.Queue, which stores messages in FIFO order. For LIFO order, +you can use asyncio.LifoQueue; For priority order you can subclass +asyncio.PriorityQueue.

  • protocol – The version of the MQTT protocol.

  • will – The will message to publish if the client disconnects unexpectedly.

  • clean_session – If True, the broker will remove all information about this @@ -247,19 +247,48 @@

    Client

    bind_port – The network port to bind this client to.

  • clean_start – (MQTT v5.0 only) Set the clean start flag always, never, or only on the first successful connection to the broker.

  • +
  • max_queued_incoming_messages – Restricts the incoming message queue size. If the +queue is full, further incoming messages are discarded. 0 or less means +unlimited (the default).

  • +
  • max_queued_outgoing_messages – Resticts the outgoing message queue size. If the +queue is full, further outgoing messages are discarded. 0 means +unlimited (the default).

  • +
  • max_inflight_messages – The maximum number of messages with QoS > 0 that can +be part way through their network flow at once.

  • +
  • max_concurrent_outgoing_calls – The maximum number of concurrent outgoing calls.

  • properties – (MQTT v5.0 only) The properties associated with the client.

  • -
  • message_retry_set – Deprecated.

  • +
  • tls_context – The SSL/TLS context.

  • +
  • tls_params – The SSL/TLS configuration to use.

  • +
  • tls_insecure – Enable/disable server hostname verification when using SSL/TLS.

  • +
  • proxy – Configure a proxy for the connection.

  • socket_options – Options to pass to the underlying socket.

  • -
  • max_concurrent_outgoing_calls – The maximum number of concurrent outgoing calls.

  • websocket_path – The path to use for websockets.

  • websocket_headers – The headers to use for websockets.

  • -
  • max_inflight_messages – The maximum number of messages with QoS > 0 that can -be part way through their network flow at once.

  • -
  • max_queued_messages – The maximum number of messages in the outgoing message -queue. 0 means unlimited.

  • +
    +
    +messages
    +

    Async generator that yields messages from the underlying message queue.

    +
    +
    Type:
    +

    AsyncGenerator[aiomqtt.client.Message, None]

    +
    +
    +
    + +
    +
    +identifier
    +

    The client identifier.

    +
    +
    Type:
    +

    str

    +
    +
    +
    +
    async subscribe(topic: str | tuple[str, paho.mqtt.subscribeoptions.SubscribeOptions] | list[tuple[str, paho.mqtt.subscribeoptions.SubscribeOptions]] | list[tuple[str, int]], qos: int = 0, options: paho.mqtt.subscribeoptions.SubscribeOptions | None = None, properties: paho.mqtt.properties.Properties | None = None, *args: Any, timeout: float | None = None, **kwargs: Any) tuple[int] | list[paho.mqtt.reasoncodes.ReasonCodes]
    @@ -325,28 +354,6 @@

    Client

    -
    -
    -messages(*, queue_class: type[asyncio.queues.Queue[aiomqtt.client.Message]] = <class 'asyncio.queues.Queue'>, queue_maxsize: int = 0) AsyncGenerator[AsyncGenerator[Message, None], None]
    -

    Async context manager that creates a queue for incoming messages.

    -
    -
    Parameters:
    -
      -
    • queue_class – The class to use for the queue. The default is -asyncio.Queue, which returns messages in FIFO order. For LIFO order, -you can use asyncio.LifoQueue; For priority order you can subclass -asyncio.PriorityQueue.

    • -
    • queue_maxsize – Restricts the queue size. If the queue is full, incoming -messages will be discarded and a warning logged. If set to 0 or -less, the queue size is infinite.

    • -
    -
    -
    Returns:
    -

    An async generator that yields messages from the underlying queue.

    -
    -
    -
    -
    async __aenter__() Self
    @@ -366,10 +373,10 @@

    ClientMessage#

    -class aiomqtt.Message(topic: str | aiomqtt.client.Topic, payload: str | bytes | bytearray | int | float | None, qos: int, retain: bool, mid: int, properties: paho.mqtt.properties.Properties | None)
    +class aiomqtt.Message(topic: str | aiomqtt.topic.Topic, payload: str | bytes | bytearray | int | float | None, qos: int, retain: bool, mid: int, properties: paho.mqtt.properties.Properties | None)

    Wraps the paho-mqtt message class to allow using our own matching logic.

    This class is not meant to be instantiated by the user. Instead, it is yielded by -the async generator returned from Client.messages().

    +the async generator Client.messages.

    Parameters:
    -

    API reference

    +

    reference

    Project links

    diff --git a/index.html b/index.html index ce1e696..6fcccda 100644 --- a/index.html +++ b/index.html @@ -166,15 +166,16 @@
  • Reconnection
  • Alongside FastAPI & Co.
  • -

    API reference

    +

    reference

    Project links

    @@ -221,10 +222,9 @@

    The idiomatic asyncio MQTT client
    async with Client("test.mosquitto.org") as client:
    -    async with client.messages() as messages:
    -        await client.subscribe("humidity/#")
    -        async for message in messages:
    -            print(message.payload)
    +    await client.subscribe("humidity/#")
    +    async for message in client.messages:
    +        print(message.payload)
     

    aiomqtt combines the stability of the time-proven paho-mqtt library with an idiomatic asyncio interface:

    diff --git a/introduction.html b/introduction.html index 05b0753..ad2cf22 100644 --- a/introduction.html +++ b/introduction.html @@ -166,15 +166,16 @@
  • Reconnection
  • Alongside FastAPI & Co.
  • -

    API reference

    +

    reference

    Project links

    diff --git a/migration-guide-v2.html b/migration-guide-v2.html new file mode 100644 index 0000000..649826b --- /dev/null +++ b/migration-guide-v2.html @@ -0,0 +1,441 @@ + + + + + + + + + Migration guide: v2.0.0 - aiomqtt + + + + + + + + + + + + + + + + + Contents + + + + + + Menu + + + + + + + + Expand + + + + + + Light mode + + + + + + + + + + + + + + Dark mode + + + + + + + Auto light/dark mode + + + + + + + + + + + + + + + + + + + +
    +
    +
    + +
    +
    +
    aiomqtt
    +
    +
    +
    + +
    + +
    +
    + +
    +
    +
    + + + + + Back to top + +
    + +
    + +
    + +
    +
    +
    +

    Migration guide: v2.0.0#

    +

    Version 2.0.0 introduces some breaking changes. This page aims to help you migrate to this new major version. The relevant changes are:

    +
      +
    • The deprecated connect and disconnect methods have been removed

    • +
    • The deprecated filtered_messages and unfiltered_messages methods have been removed

    • +
    • User-managed queues for incoming messages have been replaced with a single client-wide queue

    • +
    • Some arguments to the Client have been renamed or removed

    • +
    +
    +

    Changes to the client lifecycle#

    +

    The deprecated connect and disconnect methods have been removed. The best way to connect and disconnect from the broker is through the client’s context manager:

    +
    import asyncio
    +import aiomqtt
    +
    +
    +async def main():
    +    async with aiomqtt.Client("test.mosquitto.org") as client:
    +        await client.publish("temperature/outside", payload=28.4)
    +
    +
    +asyncio.run(main())
    +
    +
    +

    If your use case does not allow you to use a context manager, you can use the client’s __aenter__ and __aexit__ methods almost interchangeably in place of the removed connect and disconnect methods.

    +

    The __aenter__ and __aexit__ methods are designed to be called by the async with statement when the execution enters and exits the context manager. However, we can also execute them manually:

    +
    import asyncio
    +import aiomqtt
    +
    +
    +async def main():
    +    client = aiomqtt.Client("test.mosquitto.org")
    +    await client.__aenter__()
    +    try:
    +        await client.publish("temperature/outside", payload=28.4)
    +    finally:
    +        await client.__aexit__(None, None, None)
    +
    +
    +asyncio.run(main())
    +
    +
    +

    __aenter__ is equivalent to connect. __aexit__ is equivalent to disconnect except that it forces disconnection instead of throwing an exception in case the client cannot disconnect cleanly.

    +
    +

    Note

    +

    __aexit__ expects three arguments: exc_type, exc, and tb. These arguments describe the exception that caused the context manager to exit, if any. You can pass None to all of these arguments in a manual call to __aexit__.

    +
    +
    +
    +

    Changes to the message queue#

    +

    The filtered_messages, unfiltered_messages, and messages methods have been removed and replaced with a single client-wide message queue.

    +

    A minimal example of printing all messages (unfiltered) looks like this:

    +
    import asyncio
    +import aiomqtt
    +
    +
    +async def main():
    +    async with aiomqtt.Client("test.mosquitto.org") as client:
    +        await client.subscribe("temperature/#")
    +        async for message in client.messages:
    +            print(message.payload)
    +
    +
    +asyncio.run(main())
    +
    +
    +

    To handle messages from different topics differently, we can use Topic.matches():

    +
    import asyncio
    +import aiomqtt
    +
    +
    +async def main():
    +    async with aiomqtt.Client("test.mosquitto.org") as client:
    +        await client.subscribe("temperature/#")
    +        await client.subscribe("humidity/#")
    +        async for message in client.messages:
    +            if message.topic.matches("humidity/inside"):
    +                print(f"[humidity/inside] {message.payload}")
    +            if message.topic.matches("+/outside"):
    +                print(f"[+/outside] {message.payload}")
    +            if message.topic.matches("temperature/#"):
    +                print(f"[temperature/#] {message.payload}")
    +
    +
    +asyncio.run(main())
    +
    +
    +
    +

    Note

    +

    In our example, messages to temperature/outside are handled twice!

    +
    +

    The filtered_messages, unfiltered_messages, and messages methods created isolated message queues underneath, such that you could invoke them multiple times. From Version 2.0.0 on, the client maintains a single queue that holds all incoming messages, accessible via Client.messages.

    +

    If you continue to need multiple queues (e.g. because you have special concurrency requirements), you can build a “distributor” on top:

    +
    import asyncio
    +import aiomqtt
    +
    +
    +async def temperature_consumer():
    +    while True:
    +        message = await temperature_queue.get()
    +        print(f"[temperature/#] {message.payload}")
    +
    +
    +async def humidity_consumer():
    +    while True:
    +        message = await humidity_queue.get()
    +        print(f"[humidity/#] {message.payload}")
    +
    +
    +temperature_queue = asyncio.Queue()
    +humidity_queue = asyncio.Queue()
    +
    +
    +async def distributor(client):
    +    # Sort messages into the appropriate queues
    +    async for message in client.messages:
    +        if message.topic.matches("temperature/#"):
    +            temperature_queue.put_nowait(message)
    +        elif message.topic.matches("humidity/#"):
    +            humidity_queue.put_nowait(message)
    +
    +
    +async def main():
    +    async with aiomqtt.Client("test.mosquitto.org") as client:
    +        await client.subscribe("temperature/#")
    +        await client.subscribe("humidity/#")
    +        # Use a task group to manage and await all tasks
    +        async with asyncio.TaskGroup() as tg:
    +            tg.create_task(distributor(client))
    +            tg.create_task(temperature_consumer())
    +            tg.create_task(humidity_consumer())
    +
    +
    +asyncio.run(main())
    +
    +
    +
    +
    +

    Changes to client arguments#

    +
      +
    • The queue_class and queue_maxsize arguments to filtered_messages, unfiltered_messages, and messages have been moved to the Client and have been renamed to queue_type and max_queued_incoming_messages

    • +
    • The max_queued_messages client argument has been renamed to max_queued_outgoing_messages

    • +
    • The deprecated message_retry_set client argument has been removed

    • +
    +
    +
    + +
    +
    + +
    + +
    +
    + + + + + + + + + + \ No newline at end of file diff --git a/objects.inv b/objects.inv index dd4a08b..130b715 100644 Binary files a/objects.inv and b/objects.inv differ diff --git a/publishing-a-message.html b/publishing-a-message.html index e7ae444..f29ea96 100644 --- a/publishing-a-message.html +++ b/publishing-a-message.html @@ -166,15 +166,16 @@
  • Reconnection
  • Alongside FastAPI & Co.
  • -

    API reference

    +

    reference

    Project links

    diff --git a/reconnection.html b/reconnection.html index 38b94ee..91a985c 100644 --- a/reconnection.html +++ b/reconnection.html @@ -166,15 +166,16 @@
  • Reconnection
  • Alongside FastAPI & Co.
  • -

    API reference

    +

    reference

    Project links

    @@ -226,10 +227,9 @@

    Reconnectionwhile True: try: async with client: - async with client.messages() as messages: - await client.subscribe("humidity/#") - async for message in messages: - print(message.payload) + await client.subscribe("humidity/#") + async for message in client.messages: + print(message.payload) except aiomqtt.MqttError: print(f"Connection lost; Reconnecting in {interval} seconds ...") await asyncio.sleep(interval) diff --git a/search.html b/search.html index 053c354..ecb3cfc 100644 --- a/search.html +++ b/search.html @@ -163,15 +163,16 @@
  • Reconnection
  • Alongside FastAPI & Co.
  • -

    API reference

    +

    reference

    Project links

    diff --git a/searchindex.js b/searchindex.js index 2af6306..02e703c 100644 --- a/searchindex.js +++ b/searchindex.js @@ -1 +1 @@ -Search.setIndex({"docnames": ["alongside-fastapi-and-co", "connecting-to-the-broker", "developer-interface", "index", "introduction", "publishing-a-message", "reconnection", "subscribing-to-a-topic"], "filenames": ["alongside-fastapi-and-co.md", "connecting-to-the-broker.md", "developer-interface.md", "index.md", "introduction.md", "publishing-a-message.md", "reconnection.md", "subscribing-to-a-topic.md"], "titles": ["Alongside FastAPI & Co.", "Connecting to the broker", "Developer interface", "The idiomatic asyncio MQTT client", "Introduction", "Publishing a message", "Reconnection", "Subscribing to a topic"], "terms": {"mani": 0, "web": [0, 7], "framework": [0, 7], "take": [0, 7], "control": 0, "over": 0, "main": [0, 1, 3, 5, 6, 7], "function": [0, 1], "which": [0, 2, 5, 7], "can": [0, 1, 2, 3, 5, 6, 7], "make": [0, 1, 6, 7], "tricki": [0, 1], "figur": 0, "out": [0, 1, 7], "where": [0, 7], "creat": [0, 1, 2, 7], "client": [0, 1, 5, 6, 7], "how": [0, 1, 4, 7], "share": 0, "thi": [0, 1, 2, 3, 4, 5, 6, 7], "connect": [0, 2, 6], "With": [0, 1], "0": [0, 1, 2, 3, 5], "93": 0, "starlett": 0, "you": [0, 1, 2, 3, 4, 5, 7], "us": [0, 1, 2, 3, 4, 5, 7], "lifespan": 0, "context": [0, 1, 2, 6], "manag": [0, 1, 2, 7], "safe": [0, 7], "set": [0, 1, 2, 5, 7], "up": [0, 1], "global": 0, "instanc": [0, 1, 2], "i": [0, 1, 2, 3, 4, 5, 6, 7], "minim": [0, 1, 5, 7], "work": [0, 1, 5, 7], "exampl": [0, 1, 5, 7], "side": 0, "an": [0, 1, 2, 3, 4, 5, 6, 7], "aiomqtt": [0, 1, 2, 3, 4, 5, 6, 7], "listen": [0, 6], "task": [0, 7], "messag": [0, 1, 3, 6], "public": [0, 1, 2], "get": [0, 1, 3, 4, 7], "import": [0, 1, 3, 5, 6, 7], "asyncio": [0, 1, 2, 4, 5, 6, 7], "contextlib": 0, "async": [0, 1, 2, 3, 5, 6, 7], "def": [0, 1, 5, 6, 7], "await": [0, 1, 3, 5, 6, 7], "subscrib": [0, 1, 2, 3, 5, 6], "humid": [0, 1, 3, 6, 7], "print": [0, 3, 6, 7], "payload": [0, 1, 2, 3, 6, 7], "none": [0, 2, 5], "asynccontextmanag": 0, "app": 0, "test": [0, 1, 3, 5, 6, 7], "mosquitto": [0, 1, 3, 5, 6, 7], "org": [0, 1, 3, 5, 6, 7], "c": 0, "avail": [0, 1], "mqtt": [0, 1, 2, 4, 5, 7], "unawait": [0, 7], "loop": [0, 3, 7], "get_event_loop": [0, 7], "create_task": [0, 7], "yield": [0, 2], "cancel": [0, 7], "wait": [0, 2, 3, 7], "try": [0, 3, 4, 6, 7], "except": [0, 1, 6, 7], "cancellederror": [0, 7], "pass": [0, 1, 2, 7], "publish": [0, 1, 2, 3, 7], "outsid": [0, 1, 3, 5, 7], "38": [0, 1, 3], "combin": [0, 3, 7], "some": [0, 4, 7], "concept": 0, "address": [0, 2], "more": [0, 1, 3], "detail": [0, 7], "other": [0, 3, 7], "section": [0, 7], "The": [0, 1, 2, 5, 6], "between": [0, 5], "rout": 0, "explain": [0, 4], "we": [0, 1, 3, 4, 6, 7], "don": [0, 4, 5, 7], "t": [0, 3, 4, 5, 7], "immedi": [0, 5], "order": [0, 2, 7], "avoid": [0, 1], "block": [0, 6], "code": [0, 1, 3, 6, 7], "without": [0, 1], "initi": [0, 1], "": [0, 1, 2, 4, 5, 6, 7], "state": 0, "instead": [0, 2, 3, 7], "variabl": 0, "To": [1, 5, 6, 7], "topic": [1, 5], "first": [1, 2, 7], "need": [1, 4, 7], "temperatur": [1, 5, 7], "28": [1, 5], "4": [1, 5], "run": [1, 3, 5, 6, 7], "when": [1, 2, 4, 5, 7], "enter": 1, "statement": 1, "disconnect": [1, 2, 3], "exit": [1, 7], "again": [1, 7], "easier": 1, "resourc": 1, "like": [1, 3, 6, 7], "network": [1, 2, 6], "file": 1, "ensur": 1, "teardown": 1, "logic": [1, 2], "alwai": [1, 2], "execut": [1, 7], "even": 1, "case": [1, 7], "If": [1, 2, 3, 4, 5, 7], "your": [1, 3, 4, 7], "doe": 1, "allow": [1, 2], "__aenter__": [1, 2], "__aexit__": [1, 2], "method": [1, 2, 3], "directli": [1, 3], "workaround": 1, "similar": [1, 2], "would": 1, "manual": 1, "approach": [1, 7], "sure": 1, "___aexit___": 1, "also": [1, 5], "call": [1, 2, 3, 7], "bit": [1, 7], "right": 1, "ani": [1, 2, 6], "credenti": 1, "altern": 1, "our": [1, 2, 6, 7], "contribut": 1, "guid": [1, 4], "contain": [1, 2], "explan": 1, "spin": 1, "local": [1, 2], "docker": 1, "all": [1, 2, 5, 7], "document": [1, 4], "ar": [1, 3, 4, 5, 6, 7], "self": [1, 2, 7], "runnabl": 1, "For": [1, 2, 4, 5, 7], "list": [1, 2], "argument": [1, 2], "see": [1, 5, 7], "api": 1, "refer": [1, 4, 7], "often": 1, "want": [1, 5, 7], "send": [1, 5], "receiv": [1, 5, 7], "multipl": [1, 5, 7], "differ": [1, 3, 5, 7], "locat": 1, "could": 1, "new": [1, 4, 5, 7], "each": [1, 5, 7], "time": [1, 2, 3, 5, 6, 7], "veri": [1, 3, 7], "perform": 1, "ll": [1, 7], "bandwidth": 1, "publish_temperatur": 1, "publish_humid": 1, "insid": [1, 7], "non": [1, 5], "kept": 1, "aliv": 1, "goe": 1, "offlin": 1, "mean": [1, 2, 6], "store": [1, 5], "subscript": [1, 2, 7], "queue": [1, 2], "qo": [1, 2, 7], "1": [1, 3, 5, 7], "2": [1, 5, 7], "miss": [1, 4], "ha": 1, "yet": 1, "acknowledg": [1, 5], "retransmit": 1, "reconnect": 1, "clean_sess": [1, 2], "paramet": [1, 2, 5, 7], "fals": [1, 2], "true": [1, 2, 5, 6, 7], "amount": [1, 7], "queu": [1, 2, 7], "limit": [1, 7], "memori": 1, "come": [1, 7], "back": [1, 5], "onlin": 1, "long": [1, 6, 7], "eventu": 1, "start": [1, 2, 3, 4, 7], "discard": [1, 2], "class": [2, 7], "hostnam": 2, "str": [2, 5], "port": 2, "int": [2, 5], "1883": 2, "usernam": 2, "password": 2, "logger": 2, "log": 2, "client_id": 2, "tls_context": 2, "ssl": 2, "sslcontext": 2, "tls_param": 2, "tlsparamet": 2, "tls_insecur": 2, "bool": 2, "proxi": 2, "proxyset": 2, "protocol": 2, "protocolvers": 2, "Will": 2, "transport": 2, "tcp": 2, "timeout": 2, "float": [2, 5], "keepal": 2, "60": 2, "bind_address": 2, "bind_port": 2, "clean_start": 2, "3": [2, 3, 7], "properti": 2, "message_retry_set": 2, "20": 2, "socket_opt": 2, "iter": 2, "socketopt": 2, "max_concurrent_outgoing_cal": 2, "websocket_path": 2, "websocket_head": 2, "websockethead": 2, "max_inflight_messag": 2, "max_queued_messag": 2, "broker": [2, 3, 5], "ip": 2, "remot": 2, "authent": 2, "custom": 2, "id": 2, "one": [2, 5, 7], "gener": [2, 7], "automat": [2, 5], "tl": 2, "configur": 2, "enabl": 2, "disabl": 2, "server": 2, "verif": 2, "version": 2, "unexpectedli": 2, "remov": [2, 7], "inform": 2, "about": [2, 3], "persist": 2, "retain": 2, "either": 2, "websocket": 2, "default": [2, 3, 5, 7], "commun": [2, 5], "second": [2, 6, 7], "bind": 2, "v5": 2, "onli": [2, 3, 5, 7], "clean": 2, "flag": 2, "never": 2, "success": 2, "associ": 2, "deprec": 2, "option": [2, 5, 7], "underli": [2, 3], "socket": 2, "maximum": 2, "number": 2, "concurr": 2, "outgo": 2, "path": 2, "header": 2, "part": [2, 5], "wai": [2, 7], "through": [2, 5, 7], "flow": 2, "onc": [2, 5], "unlimit": [2, 7], "tupl": 2, "paho": [2, 3], "subscribeopt": 2, "arg": 2, "kwarg": 2, "reasoncod": 2, "request": [2, 4, 7], "level": [2, 5], "addit": 2, "posit": 2, "complet": 2, "math": 2, "inf": 2, "indefinit": 2, "keyword": 2, "unsubscrib": 2, "from": [2, 3, 6, 7], "unsubscript": 2, "byte": [2, 5], "bytearrai": [2, 5], "queue_class": [2, 7], "type": [2, 3, 5, 7], "queue_maxs": [2, 7], "asyncgener": 2, "incom": [2, 7], "return": [2, 3, 5, 7], "fifo": [2, 7], "lifo": [2, 7], "lifoqueu": [2, 7], "prioriti": [2, 7], "subclass": [2, 7], "priorityqueu": [2, 7], "restrict": 2, "size": [2, 7], "full": 2, "warn": 2, "less": 2, "infinit": [2, 7], "exc_typ": 2, "baseexcept": 2, "exc": 2, "tb": 2, "tracebacktyp": 2, "mid": [2, 7], "wrap": [2, 6], "own": 2, "match": [2, 7], "meant": 2, "instanti": 2, "user": 2, "wa": 2, "qualiti": [2, 7], "servic": [2, 7], "whether": 2, "valu": [2, 7], "string": [2, 5], "check": 2, "given": 2, "against": [2, 6], "otherwis": 2, "A": 2, "placehold": 2, "write": 3, "stabil": 3, "proven": 3, "librari": 3, "interfac": 3, "No": 3, "callback": 3, "welcom": 3, "mqtterror": [3, 6], "grace": 3, "forget": [3, 7], "on_unsubscrib": 3, "on_disconnect": 3, "etc": 3, "support": 3, "5": [3, 6, 7], "fulli": 3, "hint": 3, "did": 3, "mention": 3, "via": 3, "pip": 3, "depend": 3, "latest": 3, "github": [3, 4], "git": 3, "http": [3, 7], "com": 3, "sbtinstrument": 3, "sinc": 3, "python": [3, 7], "8": 3, "event": 3, "proactoreventloop": 3, "said": [3, 4], "doesn": [3, 7], "add_read": 3, "requir": 3, "pleas": 3, "switch": 3, "built": 3, "selectoreventloop": 3, "chang": 3, "selector": 3, "platform": 3, "sy": 3, "lower": [3, 7], "win32": 3, "o": [3, 7], "name": 3, "nt": 3, "set_event_loop_polici": 3, "windowsselectoreventlooppolici": 3, "applic": [3, 6], "usual": [3, 5], "under": 3, "bsd": 3, "claus": 3, "dual": 3, "One": 3, "so": [3, 6, 7], "eclips": 3, "distribut": 3, "v1": 3, "It": [3, 4], "almost": 3, "word": 3, "ident": 3, "copyright": 3, "owner": 3, "edl": 3, "holder": 3, "foundat": 3, "inc": 3, "re": [3, 4, 7], "happi": [3, 4], "read": [3, 4], "md": 3, "adher": 3, "semant": 3, "break": 3, "occur": 3, "major": 3, "x": 3, "releas": 3, "live": 3, "follow": 3, "principl": 3, "keep": [3, 7], "what": [3, 7], "look": 3, "There": 3, "few": 3, "synchron": 3, "micropython": 3, "asynchron": 3, "microcontrol": 3, "gmqtt": 3, "fastapi": 3, "wrapper": 3, "around": 3, "simplifi": 3, "integr": 3, "amqtt": 3, "includ": 3, "trio": 3, "base": [3, 7], "aim": 4, "cover": 4, "everyth": [4, 7], "know": 4, "project": 4, "expect": 4, "knowledg": 4, "thing": [4, 5], "clearli": 4, "possibl": [4, 7], "stuck": 4, "hesit": 4, "discuss": 4, "help": 4, "recommend": 4, "hivemq": 4, "essenti": 4, "afterward": [4, 7], "oasi": [4, 7], "specif": [4, 7], "great": 4, "realpython": 4, "walkthrough": 4, "doc": 4, "big": 4, "open": 4, "issu": 4, "pull": 4, "find": 4, "error": 4, "have": [4, 5, 6, 7], "idea": [4, 7], "improv": 4, "easi": [4, 7], "unclear": 4, "newcom": 4, "alreadi": 4, "familiar": 4, "fresh": 4, "ey": 4, "here": [4, 7], "That": [4, 7], "let": [4, 5, 7], "dive": 4, "transmit": 5, "stream": 5, "accept": 5, "convert": 5, "struct": 5, "pack": 5, "object": 5, "specifi": 5, "zero": 5, "length": 5, "sent": 5, "standard": 5, "implement": [5, 7], "yourself": 5, "dict": 5, "json": 5, "dump": 5, "On": 5, "end": 5, "load": 5, "decod": 5, "reliabl": 5, "three": 5, "At": 5, "most": 5, "guarante": 5, "deliveri": 5, "fastest": 5, "least": 5, "deliv": 5, "possibli": 5, "sender": 5, "until": [5, 7], "receipt": 5, "exactli": 5, "four": 5, "handshak": 5, "slowest": 5, "two": 5, "same": 5, "defin": 5, "relai": 5, "recent": 5, "last": 5, "after": 5, "thei": [5, 7], "per": 5, "previou": [5, 7], "overwritten": 5, "delet": 5, "empti": 5, "howev": 5, "necessari": 5, "overwrit": 5, "ones": 5, "inher": 6, "unstabl": 6, "fail": 6, "especi": 6, "challeng": 6, "resili": 6, "failur": 6, "abl": 6, "detect": 6, "recov": 6, "them": [6, 7], "design": 6, "reusabl": 6, "reentrant": 6, "interv": 6, "while": [6, 7], "f": [6, 7], "lost": 6, "sleep": [6, 7], "wildcard": 7, "now": 7, "appear": 7, "consol": 7, "imagin": 7, "measur": 7, "handl": 7, "provid": 7, "In": 7, "twice": 7, "sequenti": 7, "basi": 7, "modifi": 7, "e": 7, "g": 7, "ascendingli": 7, "ti": 7, "identifi": 7, "sai": 7, "priorit": 7, "custompriorityqueu": 7, "_put": 7, "item": 7, "assign": 7, "super": 7, "_get": 7, "By": 7, "taskgroup": 7, "simul": 7, "bound": 7, "tg": 7, "spawn": 7, "coroutin": 7, "sens": 7, "cpu": 7, "should": 7, "snippet": 7, "abov": 7, "sometim": 7, "singl": 7, "distributor": 7, "sort": 7, "temperature_consum": 7, "temperature_queu": 7, "humidity_consum": 7, "humidity_queu": 7, "appropri": 7, "put_nowait": 7, "elif": 7, "group": 7, "notic": 7, "program": 7, "finish": 7, "practic": 7, "gather": 7, "11": 7, "alongsid": 7, "slept": 7, "becaus": 7, "fire": 7, "care": 7, "rais": 7, "propag": 7, "explicitli": 7, "unhandl": 7, "silent": 7, "ignor": 7, "background_task": 7, "save": 7, "garbag": 7, "collect": 7, "add": 7, "add_done_callback": 7, "do": 7, "someth": 7, "els": 7, "might": 7, "done": 7, "certain": 7, "introduc": 7, "neat": 7, "featur": 7, "result": 7, "timeouterror": 7}, "objects": {}, "objtypes": {}, "objnames": {}, "titleterms": {"alongsid": 0, "fastapi": 0, "co": 0, "connect": 1, "broker": 1, "share": 1, "persist": 1, "session": 1, "develop": 2, "interfac": 2, "client": [2, 3], "messag": [2, 5, 7], "topic": [2, 7], "wildcard": 2, "The": [3, 7], "idiomat": 3, "asyncio": 3, "mqtt": 3, "instal": 3, "note": 3, "window": 3, "user": 3, "licens": 3, "contribut": 3, "version": 3, "changelog": 3, "relat": 3, "project": 3, "introduct": 4, "publish": 5, "payload": 5, "encod": 5, "qualiti": 5, "servic": 5, "qo": 5, "retain": 5, "reconnect": 6, "subscrib": 7, "filter": 7, "queue": 7, "process": 7, "concurr": 7, "listen": 7, "without": 7, "block": 7, "stop": 7, "after": 7, "timeout": 7}, "envversion": {"sphinx.domains.c": 2, "sphinx.domains.changeset": 1, "sphinx.domains.citation": 1, "sphinx.domains.cpp": 8, "sphinx.domains.index": 1, "sphinx.domains.javascript": 2, "sphinx.domains.math": 2, "sphinx.domains.python": 3, "sphinx.domains.rst": 2, "sphinx.domains.std": 2, "sphinx": 57}, "alltitles": {"Alongside FastAPI & Co.": [[0, "alongside-fastapi-co"]], "Connecting to the broker": [[1, "connecting-to-the-broker"]], "Sharing the connection": [[1, "sharing-the-connection"]], "Persistent sessions": [[1, "persistent-sessions"]], "Developer interface": [[2, "developer-interface"]], "Client": [[2, "client"]], "Message": [[2, "message"]], "Topic": [[2, "topic"]], "Wildcard": [[2, "wildcard"]], "The idiomatic asyncio MQTT client": [[3, "the-idiomatic-asyncio-mqtt-client"]], "Installation": [[3, "installation"]], "Note for Windows users": [[3, "note-for-windows-users"]], "License": [[3, "license"]], "Contributing": [[3, "contributing"]], "Versioning": [[3, "versioning"]], "Changelog": [[3, "changelog"]], "Related projects": [[3, "related-projects"]], "Introduction": [[4, "introduction"]], "Publishing a message": [[5, "publishing-a-message"]], "Payload encoding": [[5, "payload-encoding"]], "Quality of Service (QoS)": [[5, "quality-of-service-qos"]], "Retained messages": [[5, "retained-messages"]], "Reconnection": [[6, "reconnection"]], "Subscribing to a topic": [[7, "subscribing-to-a-topic"]], "Filtering messages": [[7, "filtering-messages"]], "The message queue": [[7, "the-message-queue"]], "Processing concurrently": [[7, "processing-concurrently"]], "Listening without blocking": [[7, "listening-without-blocking"]], "Stop listening": [[7, "stop-listening"]], "Stop listening after timeout": [[7, "stop-listening-after-timeout"]]}, "indexentries": {}}) \ No newline at end of file +Search.setIndex({"docnames": ["alongside-fastapi-and-co", "connecting-to-the-broker", "developer-interface", "index", "introduction", "migration-guide-v2", "publishing-a-message", "reconnection", "subscribing-to-a-topic"], "filenames": ["alongside-fastapi-and-co.md", "connecting-to-the-broker.md", "developer-interface.md", "index.md", "introduction.md", "migration-guide-v2.md", "publishing-a-message.md", "reconnection.md", "subscribing-to-a-topic.md"], "titles": ["Alongside FastAPI & Co.", "Connecting to the broker", "Developer interface", "The idiomatic asyncio MQTT client", "Introduction", "Migration guide: v2.0.0", "Publishing a message", "Reconnection", "Subscribing to a topic"], "terms": {"mani": 0, "web": [0, 8], "framework": [0, 8], "take": [0, 8], "control": 0, "over": [0, 8], "main": [0, 1, 3, 5, 6, 7, 8], "function": [0, 1], "which": [0, 2, 6, 8], "can": [0, 1, 2, 3, 5, 6, 7, 8], "make": [0, 1, 7, 8], "tricki": [0, 1], "figur": 0, "out": [0, 1, 8], "where": [0, 8], "creat": [0, 1, 5, 8], "client": [0, 1, 6, 7, 8], "how": [0, 1, 4, 8], "share": 0, "thi": [0, 1, 2, 3, 4, 5, 6, 7, 8], "connect": [0, 2, 5, 7], "With": [0, 1], "0": [0, 1, 2, 3, 6], "93": 0, "starlett": 0, "you": [0, 1, 2, 3, 4, 5, 6, 8], "us": [0, 1, 2, 3, 4, 5, 6, 8], "lifespan": 0, "context": [0, 1, 2, 5, 7], "manag": [0, 1, 2, 5, 8], "safe": [0, 8], "set": [0, 1, 2, 6, 8], "up": [0, 1], "global": 0, "instanc": [0, 1, 2], "i": [0, 1, 2, 3, 4, 5, 6, 7, 8], "minim": [0, 1, 5, 6, 8], "work": [0, 1, 6, 8], "exampl": [0, 1, 5, 6, 8], "side": 0, "an": [0, 1, 3, 4, 5, 6, 7, 8], "aiomqtt": [0, 1, 2, 3, 4, 5, 6, 7, 8], "listen": [0, 7], "task": [0, 5, 8], "messag": [0, 1, 3, 7], "public": [0, 1, 2], "get": [0, 1, 3, 4, 5, 8], "import": [0, 1, 3, 5, 6, 7, 8], "asyncio": [0, 1, 2, 4, 5, 6, 7, 8], "contextlib": 0, "async": [0, 1, 2, 3, 5, 6, 7, 8], "def": [0, 1, 5, 6, 7, 8], "print": [0, 3, 5, 7, 8], "payload": [0, 1, 2, 3, 5, 7, 8], "none": [0, 2, 5, 6], "asynccontextmanag": 0, "app": 0, "test": [0, 1, 3, 5, 6, 7, 8], "mosquitto": [0, 1, 3, 5, 6, 7, 8], "org": [0, 1, 3, 5, 6, 7, 8], "c": 0, "avail": [0, 1], "mqtt": [0, 1, 2, 4, 6, 8], "unawait": [0, 8], "await": [0, 1, 3, 5, 6, 7, 8], "subscrib": [0, 1, 2, 3, 5, 6, 7], "humid": [0, 1, 3, 5, 7, 8], "loop": [0, 3, 8], "get_event_loop": [0, 8], "create_task": [0, 5, 8], "yield": [0, 2], "cancel": [0, 8], "wait": [0, 2, 3, 8], "try": [0, 3, 4, 5, 7, 8], "except": [0, 1, 5, 7, 8], "cancellederror": [0, 8], "pass": [0, 1, 2, 5, 8], "publish": [0, 1, 2, 3, 5, 8], "outsid": [0, 1, 3, 5, 6, 8], "38": [0, 1, 3], "combin": [0, 3, 8], "some": [0, 4, 5, 8], "concept": 0, "address": [0, 2], "more": [0, 1, 3], "detail": [0, 8], "other": [0, 3, 8], "section": [0, 8], "The": [0, 1, 2, 5, 6, 7], "between": [0, 6], "rout": 0, "explain": [0, 4], "we": [0, 1, 3, 4, 5, 7, 8], "don": [0, 4, 6, 8], "t": [0, 3, 4, 6, 8], "immedi": [0, 6], "order": [0, 2, 8], "avoid": [0, 1], "block": [0, 7], "code": [0, 1, 3, 7, 8], "without": [0, 1], "initi": [0, 1], "": [0, 1, 2, 4, 5, 6, 7, 8], "state": 0, "instead": [0, 2, 3, 5, 8], "variabl": 0, "To": [1, 5, 6, 7, 8], "topic": [1, 5, 6], "first": [1, 2, 8], "need": [1, 4, 5, 8], "temperatur": [1, 5, 6, 8], "28": [1, 5, 6], "4": [1, 5, 6], "run": [1, 3, 5, 6, 7, 8], "when": [1, 2, 4, 5, 6, 8], "enter": [1, 5], "statement": [1, 5], "disconnect": [1, 2, 3, 5], "exit": [1, 5, 8], "again": [1, 8], "easier": 1, "resourc": 1, "like": [1, 3, 5, 7, 8], "network": [1, 2, 7], "file": 1, "ensur": 1, "teardown": 1, "logic": [1, 2], "alwai": [1, 2], "execut": [1, 5, 8], "even": 1, "case": [1, 5, 8], "If": [1, 2, 3, 4, 5, 6, 8], "your": [1, 3, 4, 5, 8], "doe": [1, 5], "allow": [1, 2, 5], "__aenter__": [1, 2, 5], "__aexit__": [1, 2, 5], "method": [1, 2, 3, 5], "workaround": 1, "approach": [1, 8], "yourself": [1, 6], "___aexit___": 1, "also": [1, 5, 6], "call": [1, 2, 3, 5, 8], "bit": [1, 8], "right": 1, "ani": [1, 2, 5, 7], "credenti": 1, "altern": 1, "our": [1, 2, 5, 7, 8], "contribut": 1, "guid": [1, 4], "contain": [1, 2], "explan": 1, "spin": 1, "local": [1, 2], "docker": 1, "all": [1, 2, 5, 6, 8], "document": [1, 4], "ar": [1, 2, 3, 4, 5, 6, 7, 8], "self": [1, 2, 8], "runnabl": 1, "For": [1, 2, 4, 6, 8], "list": [1, 2], "argument": [1, 2], "see": [1, 6, 8], "api": 1, "refer": [1, 4, 8], "often": 1, "want": [1, 6, 8], "send": [1, 6], "receiv": [1, 6, 8], "multipl": [1, 5, 6], "differ": [1, 3, 5, 6, 8], "locat": 1, "could": [1, 5], "new": [1, 4, 5, 6, 8], "each": [1, 6, 8], "time": [1, 2, 3, 5, 6, 7, 8], "veri": [1, 3, 8], "perform": [1, 2], "ll": [1, 8], "bandwidth": 1, "publish_temperatur": 1, "publish_humid": 1, "insid": [1, 5, 8], "non": [1, 6], "kept": 1, "aliv": 1, "goe": 1, "offlin": 1, "mean": [1, 2, 7], "store": [1, 2, 6], "subscript": [1, 2, 8], "queue": [1, 2], "qo": [1, 2, 8], "1": [1, 3, 6, 8], "2": [1, 5, 6, 8], "miss": [1, 4], "ha": [1, 5], "yet": 1, "acknowledg": [1, 6], "retransmit": 1, "reconnect": 1, "clean_sess": [1, 2], "paramet": [1, 2, 6, 8], "fals": [1, 2], "true": [1, 2, 5, 6, 7, 8], "amount": [1, 8], "queu": [1, 2, 8], "limit": [1, 8], "memori": 1, "come": [1, 8], "back": [1, 6], "onlin": 1, "long": [1, 7, 8], "eventu": 1, "start": [1, 2, 3, 4, 8], "discard": [1, 2], "class": [2, 8], "hostnam": 2, "str": [2, 6], "port": 2, "int": [2, 6], "1883": 2, "usernam": 2, "password": 2, "logger": 2, "log": 2, "identifi": [2, 8], "queue_typ": [2, 5], "type": [2, 3, 6, 8], "protocol": 2, "protocolvers": 2, "Will": 2, "bool": 2, "transport": 2, "tcp": 2, "timeout": 2, "float": [2, 6], "keepal": 2, "60": 2, "bind_address": 2, "bind_port": 2, "clean_start": 2, "3": [2, 3, 8], "max_queued_incoming_messag": [2, 5], "max_queued_outgoing_messag": [2, 5], "max_inflight_messag": 2, "max_concurrent_outgoing_cal": 2, "properti": 2, "tls_context": 2, "ssl": 2, "sslcontext": 2, "tls_param": 2, "tlsparamet": 2, "tls_insecur": 2, "proxi": 2, "proxyset": 2, "socket_opt": 2, "iter": [2, 8], "socketopt": 2, "websocket_path": 2, "websocket_head": 2, "websockethead": 2, "broker": [2, 3, 5, 6], "ip": 2, "remot": 2, "authent": 2, "custom": 2, "gener": [2, 8], "automat": [2, 6], "default": [2, 3, 6, 8], "fifo": [2, 8], "lifo": [2, 8], "lifoqueu": [2, 8], "prioriti": [2, 8], "subclass": [2, 8], "priorityqueu": [2, 8], "version": [2, 5], "unexpectedli": 2, "remov": [2, 5, 8], "inform": 2, "about": [2, 3], "persist": 2, "retain": 2, "either": 2, "websocket": 2, "commun": [2, 6], "second": [2, 7, 8], "bind": 2, "v5": 2, "onli": [2, 3, 6, 8], "clean": 2, "flag": 2, "never": 2, "success": 2, "restrict": 2, "incom": [2, 5, 8], "size": [2, 8], "full": 2, "further": 2, "less": 2, "unlimit": [2, 8], "restict": 2, "outgo": 2, "maximum": 2, "number": 2, "part": [2, 6], "wai": [2, 5, 8], "through": [2, 5, 6, 8], "flow": 2, "onc": [2, 6], "concurr": [2, 5], "associ": 2, "tl": 2, "configur": 2, "enabl": 2, "disabl": 2, "server": 2, "verif": 2, "option": [2, 6, 8], "underli": [2, 3], "socket": 2, "path": 2, "header": 2, "from": [2, 3, 5, 7, 8], "asyncgener": 2, "tupl": 2, "paho": [2, 3], "subscribeopt": 2, "arg": 2, "kwarg": 2, "reasoncod": 2, "request": [2, 4, 8], "level": [2, 6], "addit": 2, "posit": 2, "complet": 2, "math": 2, "inf": 2, "indefinit": 2, "keyword": 2, "unsubscrib": 2, "unsubscript": 2, "byte": [2, 6], "bytearrai": [2, 6], "exc_typ": [2, 5], "baseexcept": 2, "exc": [2, 5], "tb": [2, 5], "tracebacktyp": 2, "mid": [2, 8], "wrap": [2, 7], "own": 2, "match": [2, 5, 8], "meant": 2, "instanti": 2, "user": [2, 5], "wa": 2, "qualiti": [2, 8], "servic": [2, 8], "whether": 2, "id": 2, "valu": [2, 8], "string": [2, 6], "check": 2, "given": 2, "against": [2, 7], "return": [2, 3, 6, 8], "otherwis": 2, "A": [2, 5], "similar": 2, "placehold": 2, "access": [2, 5], "attribut": 2, "directli": [2, 3], "oper": 2, "write": 3, "stabil": 3, "proven": 3, "librari": 3, "interfac": 3, "No": 3, "callback": 3, "welcom": 3, "mqtterror": [3, 7], "grace": 3, "forget": [3, 8], "on_unsubscrib": 3, "on_disconnect": 3, "etc": 3, "support": 3, "5": [3, 7, 8], "fulli": 3, "hint": 3, "did": 3, "mention": 3, "via": [3, 5], "pip": 3, "depend": 3, "latest": 3, "github": [3, 4], "git": 3, "http": [3, 8], "com": 3, "sbtinstrument": 3, "sinc": 3, "python": [3, 8], "8": 3, "event": 3, "proactoreventloop": 3, "said": [3, 4], "doesn": [3, 8], "add_read": 3, "requir": [3, 5], "pleas": 3, "switch": 3, "built": 3, "selectoreventloop": 3, "chang": 3, "selector": 3, "platform": 3, "sy": 3, "lower": [3, 8], "win32": 3, "o": [3, 8], "name": 3, "nt": 3, "set_event_loop_polici": 3, "windowsselectoreventlooppolici": 3, "applic": [3, 7], "usual": [3, 6], "under": 3, "bsd": 3, "claus": 3, "dual": 3, "One": 3, "so": [3, 7, 8], "eclips": 3, "distribut": 3, "v1": 3, "It": [3, 4], "almost": [3, 5], "word": 3, "ident": 3, "copyright": 3, "owner": 3, "edl": 3, "holder": 3, "foundat": 3, "inc": 3, "re": [3, 4, 8], "happi": [3, 4], "read": [3, 4], "md": 3, "adher": 3, "semant": 3, "break": [3, 5], "occur": 3, "major": [3, 5], "x": 3, "releas": 3, "live": 3, "follow": 3, "principl": 3, "keep": [3, 8], "what": [3, 8], "look": [3, 5], "There": 3, "few": 3, "synchron": 3, "micropython": 3, "asynchron": 3, "microcontrol": 3, "gmqtt": 3, "fastapi": 3, "wrapper": 3, "around": 3, "simplifi": 3, "integr": 3, "amqtt": 3, "includ": 3, "trio": 3, "base": [3, 8], "aim": [4, 5], "cover": 4, "everyth": [4, 8], "know": 4, "project": 4, "expect": [4, 5], "knowledg": 4, "thing": [4, 6], "clearli": 4, "possibl": [4, 8], "stuck": 4, "hesit": 4, "discuss": 4, "help": [4, 5], "recommend": 4, "hivemq": 4, "essenti": 4, "afterward": [4, 8], "oasi": [4, 8], "specif": [4, 8], "great": 4, "realpython": 4, "walkthrough": 4, "doc": 4, "big": 4, "open": 4, "issu": 4, "pull": 4, "find": 4, "error": 4, "have": [4, 5, 6, 7, 8], "idea": [4, 8], "improv": 4, "easi": [4, 8], "unclear": 4, "newcom": 4, "alreadi": 4, "familiar": 4, "fresh": 4, "ey": 4, "here": [4, 8], "That": [4, 8], "let": [4, 6, 8], "dive": 4, "introduc": [5, 8], "page": 5, "relev": 5, "deprec": 5, "been": 5, "filtered_messag": 5, "unfiltered_messag": 5, "replac": 5, "singl": [5, 8], "wide": 5, "renam": 5, "best": 5, "interchang": 5, "place": 5, "design": [5, 7], "howev": [5, 6], "them": [5, 7, 8], "manual": 5, "final": 5, "equival": 5, "forc": 5, "throw": 5, "cannot": 5, "cleanli": 5, "three": [5, 6], "These": 5, "describ": 5, "caus": 5, "unfilt": 5, "handl": [5, 8], "f": [5, 7, 8], "In": [5, 8], "twice": [5, 8], "isol": 5, "underneath": 5, "invok": 5, "maintain": 5, "hold": 5, "continu": 5, "e": [5, 8], "g": [5, 8], "becaus": [5, 8], "special": 5, "build": 5, "distributor": [5, 8], "top": 5, "temperature_consum": [5, 8], "while": [5, 7, 8], "temperature_queu": [5, 8], "humidity_consum": [5, 8], "humidity_queu": [5, 8], "sort": [5, 8], "appropri": [5, 8], "put_nowait": [5, 8], "elif": [5, 8], "group": [5, 8], "taskgroup": [5, 8], "tg": [5, 8], "queue_class": [5, 8], "queue_maxs": [5, 8], "move": 5, "max_queued_messag": 5, "message_retry_set": 5, "transmit": 6, "stream": 6, "accept": 6, "convert": 6, "struct": 6, "pack": 6, "object": 6, "specifi": 6, "zero": 6, "length": 6, "sent": 6, "standard": 6, "implement": [6, 8], "dict": 6, "json": 6, "dump": 6, "On": 6, "end": 6, "load": 6, "decod": 6, "reliabl": 6, "one": [6, 8], "At": 6, "most": 6, "guarante": 6, "deliveri": 6, "fastest": 6, "least": 6, "deliv": 6, "possibli": 6, "sender": 6, "until": [6, 8], "receipt": 6, "exactli": 6, "four": 6, "handshak": 6, "slowest": 6, "two": 6, "same": 6, "defin": 6, "relai": 6, "recent": 6, "last": 6, "after": 6, "thei": [6, 8], "per": 6, "previou": [6, 8], "overwritten": 6, "delet": 6, "empti": 6, "necessari": 6, "overwrit": 6, "ones": 6, "inher": 7, "unstabl": 7, "fail": 7, "especi": 7, "challeng": 7, "resili": 7, "failur": 7, "abl": 7, "detect": 7, "recov": 7, "reusabl": 7, "reentrant": 7, "interv": 7, "lost": 7, "sleep": [7, 8], "intern": 8, "wildcard": 8, "now": 8, "appear": 8, "consol": 8, "imagin": 8, "measur": 8, "provid": 8, "sequenti": 8, "basi": 8, "modifi": 8, "ascendingli": 8, "ti": 8, "sai": 8, "priorit": 8, "custompriorityqueu": 8, "_put": 8, "item": 8, "assign": 8, "super": 8, "_get": 8, "By": 8, "simul": 8, "bound": 8, "spawn": 8, "coroutin": 8, "sens": 8, "cpu": 8, "should": 8, "snippet": 8, "abov": 8, "sometim": 8, "notic": 8, "program": 8, "finish": 8, "practic": 8, "gather": 8, "11": 8, "alongsid": 8, "slept": 8, "fire": 8, "care": 8, "rais": 8, "propag": 8, "explicitli": 8, "unhandl": 8, "silent": 8, "ignor": 8, "background_task": 8, "save": 8, "garbag": 8, "collect": 8, "add": 8, "add_done_callback": 8, "infinit": 8, "do": 8, "someth": 8, "els": 8, "might": 8, "done": 8, "certain": 8, "neat": 8, "featur": 8, "result": 8, "timeouterror": 8}, "objects": {}, "objtypes": {}, "objnames": {}, "titleterms": {"alongsid": 0, "fastapi": 0, "co": 0, "connect": 1, "broker": 1, "share": 1, "persist": 1, "session": 1, "develop": 2, "interfac": 2, "client": [2, 3, 5], "messag": [2, 5, 6, 8], "topic": [2, 8], "wildcard": 2, "The": [3, 8], "idiomat": 3, "asyncio": 3, "mqtt": 3, "instal": 3, "note": 3, "window": 3, "user": 3, "licens": 3, "contribut": 3, "version": 3, "changelog": 3, "relat": 3, "project": 3, "introduct": 4, "migrat": 5, "guid": 5, "v2": 5, "0": 5, "chang": 5, "lifecycl": 5, "queue": [5, 8], "argument": 5, "publish": 6, "payload": 6, "encod": 6, "qualiti": 6, "servic": 6, "qo": 6, "retain": 6, "reconnect": 7, "subscrib": 8, "filter": 8, "process": 8, "concurr": 8, "multipl": 8, "listen": 8, "without": 8, "block": 8, "stop": 8, "after": 8, "timeout": 8}, "envversion": {"sphinx.domains.c": 2, "sphinx.domains.changeset": 1, "sphinx.domains.citation": 1, "sphinx.domains.cpp": 8, "sphinx.domains.index": 1, "sphinx.domains.javascript": 2, "sphinx.domains.math": 2, "sphinx.domains.python": 3, "sphinx.domains.rst": 2, "sphinx.domains.std": 2, "sphinx": 57}, "alltitles": {"Alongside FastAPI & Co.": [[0, "alongside-fastapi-co"]], "Connecting to the broker": [[1, "connecting-to-the-broker"]], "Sharing the connection": [[1, "sharing-the-connection"]], "Persistent sessions": [[1, "persistent-sessions"]], "Developer interface": [[2, "developer-interface"]], "Client": [[2, "client"]], "Message": [[2, "message"]], "Topic": [[2, "topic"]], "Wildcard": [[2, "wildcard"]], "The idiomatic asyncio MQTT client": [[3, "the-idiomatic-asyncio-mqtt-client"]], "Installation": [[3, "installation"]], "Note for Windows users": [[3, "note-for-windows-users"]], "License": [[3, "license"]], "Contributing": [[3, "contributing"]], "Versioning": [[3, "versioning"]], "Changelog": [[3, "changelog"]], "Related projects": [[3, "related-projects"]], "Introduction": [[4, "introduction"]], "Migration guide: v2.0.0": [[5, "migration-guide-v2-0-0"]], "Changes to the client lifecycle": [[5, "changes-to-the-client-lifecycle"]], "Changes to the message queue": [[5, "changes-to-the-message-queue"]], "Changes to client arguments": [[5, "changes-to-client-arguments"]], "Publishing a message": [[6, "publishing-a-message"]], "Payload encoding": [[6, "payload-encoding"]], "Quality of Service (QoS)": [[6, "quality-of-service-qos"]], "Retained messages": [[6, "retained-messages"]], "Reconnection": [[7, "reconnection"]], "Subscribing to a topic": [[8, "subscribing-to-a-topic"]], "Filtering messages": [[8, "filtering-messages"]], "The message queue": [[8, "the-message-queue"]], "Processing concurrently": [[8, "processing-concurrently"]], "Multiple queues": [[8, "multiple-queues"]], "Listening without blocking": [[8, "listening-without-blocking"]], "Stop listening": [[8, "stop-listening"]], "Stop listening after timeout": [[8, "stop-listening-after-timeout"]]}, "indexentries": {}}) \ No newline at end of file diff --git a/subscribing-to-a-topic.html b/subscribing-to-a-topic.html index 94edf9b..6bf616f 100644 --- a/subscribing-to-a-topic.html +++ b/subscribing-to-a-topic.html @@ -166,15 +166,16 @@
  • Reconnection
  • Alongside FastAPI & Co.
  • -

    API reference

    +

    reference

    Project links

    @@ -213,17 +214,16 @@

    Subscribing to a topic#

    -

    To receive messages for a topic, we need to subscribe to it and listen for messages. This is a minimal working example that listens for messages to the temperature/# wildcard:

    +

    To receive messages for a topic, we need to subscribe to it. Incoming messages are queued internally. You can use the Client.message generator to iterate over incoming messages. This is a minimal working example that listens for messages to the temperature/# wildcard:

    import asyncio
     import aiomqtt
     
     
     async def main():
         async with aiomqtt.Client("test.mosquitto.org") as client:
    -        async with client.messages() as messages:
    -            await client.subscribe("temperature/#")
    -            async for message in messages:
    -                print(message.payload)
    +        await client.subscribe("temperature/#")
    +        async for message in client.messages:
    +            print(message.payload)
     
     
     asyncio.run(main())
    @@ -244,16 +244,15 @@ 

    Filtering messagesasync def main(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - await client.subscribe("humidity/#") - async for message in messages: - if message.topic.matches("humidity/inside"): - print(f"[humidity/outside] {message.payload}") - if message.topic.matches("+/outside"): - print(f"[+/inside] {message.payload}") - if message.topic.matches("temperature/#"): - print(f"[temperature/#] {message.payload}") + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") + async for message in client.messages: + if message.topic.matches("humidity/inside"): + print(f"[humidity/inside] {message.payload}") + if message.topic.matches("+/outside"): + print(f"[+/outside] {message.payload}") + if message.topic.matches("temperature/#"): + print(f"[temperature/#] {message.payload}") asyncio.run(main()) @@ -270,8 +269,8 @@

    Filtering messages

    The message queue#

    -

    Messages are queued and returned sequentially from Client.messages().

    -

    The default queue is asyncio.Queue which returns messages on a FIFO (“first in first out”) basis. You can pass other types of asyncio queues as queue_class to Client.messages() to modify the order in which messages are returned, e.g. asyncio.LifoQueue.

    +

    Messages are queued internally and returned sequentially from Client.messages.

    +

    The default queue is asyncio.Queue which returns messages on a FIFO (“first in first out”) basis. You can pass other types of asyncio queues as queue_class to the Client to modify the order in which messages are returned, e.g. asyncio.LifoQueue.

    You can subclass asyncio.PriorityQueue to queue based on priority. Messages are returned ascendingly by their priority values. In the case of ties, messages with lower message identifiers are returned first.

    Let’s say we measure temperature and humidity again, but we want to prioritize humidity:

    import asyncio
    @@ -290,12 +289,13 @@ 

    The message queueasync def main(): - async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages(queue_class=CustomPriorityQueue) as messages: - await client.subscribe("temperature/#") - await client.subscribe("humidity/#") - async for message in messages: - print(message.payload) + async with aiomqtt.Client( + "test.mosquitto.org", queue_class=CustomPriorityQueue + ) as client: + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") + async for message in client.messages: + print(message.payload) asyncio.run(main()) @@ -308,7 +308,7 @@

    The message queue

    Processing concurrently#

    -

    Messages are queued and returned sequentially from Client.messages(). If a message takes a long time to handle, it blocks the handling of other messages.

    +

    Messages are queued internally and returned sequentially from Client.messages. If a message takes a long time to handle, it blocks the handling of other messages.

    You can handle messages concurrently by using an asyncio.TaskGroup like so:

    +
    +

    Multiple queues#

    The code snippet above handles each message in a new coroutine. Sometimes, we want to handle messages from different topics concurrently, but sequentially inside a single topic.

    The idea here is to implement a “distributor” that sorts incoming messages into multiple asyncio queues. Each queue is then processed by a different coroutine. Let’s see how this works for our temperature and humidity messages:

    import asyncio
    @@ -358,19 +361,18 @@ 

    Processing concurrentlyasync def distributor(client): - async with client.messages() as messages: - await client.subscribe("temperature/#") - await client.subscribe("humidity/#") - # Sort messages into the appropriate queues - async for message in messages: - if message.topic.matches("temperature/#"): - temperature_queue.put_nowait(message) - elif message.topic.matches("humidity/#"): - humidity_queue.put_nowait(message) + # Sort messages into the appropriate queues + async for message in client.messages: + if message.topic.matches("temperature/#"): + temperature_queue.put_nowait(message) + elif message.topic.matches("humidity/#"): + humidity_queue.put_nowait(message) async def main(): async with aiomqtt.Client("test.mosquitto.org") as client: + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") # Use a task group to manage and await all tasks async with asyncio.TaskGroup() as tg: tg.create_task(distributor(client)) @@ -403,13 +405,13 @@

    Listening without blockingasync def listen(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - async for message in messages: - print(message.payload) + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) async def main(): + # Use a task group to manage and await all tasks async with asyncio.TaskGroup() as tg: tg.create_task(sleep(2)) tg.create_task(listen()) # Start the listener task @@ -432,10 +434,9 @@

    Listening without blockingasync def listen(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - async for message in messages: - print(message.payload) + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) background_tasks = set() @@ -471,10 +472,9 @@

    Stop listeningasync def listen(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - async for message in messages: - print(message.payload) + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) async def main(): @@ -505,10 +505,9 @@

    Stop listening after timeoutasync def listen(): async with aiomqtt.Client("test.mosquitto.org") as client: - async with client.messages() as messages: - await client.subscribe("temperature/#") - async for message in messages: - print(message.payload) + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) async def main(): @@ -593,6 +592,7 @@

    Stop listening after timeoutFiltering messages
  • The message queue
  • Processing concurrently
  • +
  • Multiple queues
  • Listening without blocking
  • Stop listening
  • Stop listening after timeout