Skip to content

Commit

Permalink
deploy: be0fddf
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] committed Jan 15, 2024
1 parent 209c83d commit 3ee3b0e
Show file tree
Hide file tree
Showing 27 changed files with 806 additions and 194 deletions.
Binary file modified .doctrees/alongside-fastapi-and-co.doctree
Binary file not shown.
Binary file modified .doctrees/connecting-to-the-broker.doctree
Binary file not shown.
Binary file modified .doctrees/developer-interface.doctree
Binary file not shown.
Binary file modified .doctrees/environment.pickle
Binary file not shown.
Binary file modified .doctrees/index.doctree
Binary file not shown.
Binary file added .doctrees/migration-guide-v2.doctree
Binary file not shown.
Binary file modified .doctrees/reconnection.doctree
Binary file not shown.
Binary file modified .doctrees/subscribing-to-a-topic.doctree
Binary file not shown.
7 changes: 3 additions & 4 deletions _sources/alongside-fastapi-and-co.md.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ import fastapi


async def listen(client):
async with client.messages() as messages:
await client.subscribe("humidity/#")
async for message in messages:
print(message.payload)
async for message in client.messages:
print(message.payload)


client = None
Expand All @@ -28,6 +26,7 @@ async def lifespan(app):
# Make client globally available
client = c
# Listen for MQTT messages in (unawaited) asyncio task
await client.subscribe("humidity/#")
loop = asyncio.get_event_loop()
task = loop.create_task(listen(client))
yield
Expand Down
2 changes: 1 addition & 1 deletion _sources/connecting-to-the-broker.md.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The connection to the broker is managed by the `Client` context manager. This co
Context managers make it easier to manage resources like network connections or files by ensuring that their teardown logic is always executed -- even in case of an exception.

```{tip}
If your use case does not allow you to use a context manager, you can use the client's `__aenter__` and `__aexit__` methods directly as a workaround, similar to how you would use manual `connect` and `disconnect` methods. With this approach you need to make sure that `___aexit___` is also called in case of an exception. Avoid this workaround if you can, it's a bit tricky to get right.
If your use case does not allow you to use a context manager, you can use the client's `__aenter__` and `__aexit__` methods to connect and disconnect as a workaround. With this approach you need to ensure yourself that `___aexit___` is also called in case of an exception. Avoid this workaround if you can, it's a bit tricky to get right.
```

```{note}
Expand Down
5 changes: 3 additions & 2 deletions _sources/index.md.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ alongside-fastapi-and-co
```

```{toctree}
:caption: API reference
:caption: reference
:hidden:

developer-interface
migration-guide-v2
```

```{toctree}
Expand All @@ -39,7 +40,7 @@ developer-interface

GitHub <https://github.com/sbtinstruments/aiomqtt>
Issue tracker <https://github.com/sbtinstruments/aiomqtt/issues>
Discussions <https://github.com/sbtinstruments/aiomqtt/discussions>
Changelog <https://github.com/sbtinstruments/aiomqtt/blob/main/CHANGELOG.md>
Contributing <https://github.com/sbtinstruments/aiomqtt/blob/main/CONTRIBUTING.md>
PyPI <https://pypi.org/project/aiomqtt/>
```
154 changes: 154 additions & 0 deletions _sources/migration-guide-v2.md.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Migration guide: v2.0.0

Version 2.0.0 introduces some breaking changes. This page aims to help you migrate to this new major version. The relevant changes are:

- The deprecated `connect` and `disconnect` methods have been removed
- The deprecated `filtered_messages` and `unfiltered_messages` methods have been removed
- User-managed queues for incoming messages have been replaced with a single client-wide queue
- Some arguments to the `Client` have been renamed or removed

## Changes to the client lifecycle

The deprecated `connect` and `disconnect` methods have been removed. The best way to connect and disconnect from the broker is through the client's context manager:

```python
import asyncio
import aiomqtt


async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.publish("temperature/outside", payload=28.4)


asyncio.run(main())
```

If your use case does not allow you to use a context manager, you can use the client’s `__aenter__` and `__aexit__` methods almost interchangeably in place of the removed `connect` and `disconnect` methods.

The `__aenter__` and `__aexit__` methods are designed to be called by the `async with` statement when the execution enters and exits the context manager. However, we can also execute them manually:

```python
import asyncio
import aiomqtt


async def main():
client = aiomqtt.Client("test.mosquitto.org")
await client.__aenter__()
try:
await client.publish("temperature/outside", payload=28.4)
finally:
await client.__aexit__(None, None, None)


asyncio.run(main())
```

`__aenter__` is equivalent to `connect`. `__aexit__` is equivalent to `disconnect` except that it forces disconnection instead of throwing an exception in case the client cannot disconnect cleanly.

```{note}
`__aexit__` expects three arguments: `exc_type`, `exc`, and `tb`. These arguments describe the exception that caused the context manager to exit, if any. You can pass `None` to all of these arguments in a manual call to `__aexit__`.
```

## Changes to the message queue

The `filtered_messages`, `unfiltered_messages`, and `messages` methods have been removed and replaced with a single client-wide message queue.

A minimal example of printing all messages (unfiltered) looks like this:

```python
import asyncio
import aiomqtt


async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
async for message in client.messages:
print(message.payload)


asyncio.run(main())
```

To handle messages from different topics differently, we can use `Topic.matches()`:

```python
import asyncio
import aiomqtt


async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
async for message in client.messages:
if message.topic.matches("humidity/inside"):
print(f"[humidity/inside] {message.payload}")
if message.topic.matches("+/outside"):
print(f"[+/outside] {message.payload}")
if message.topic.matches("temperature/#"):
print(f"[temperature/#] {message.payload}")


asyncio.run(main())
```

```{note}
In our example, messages to `temperature/outside` are handled twice!
```

The `filtered_messages`, `unfiltered_messages`, and `messages` methods created isolated message queues underneath, such that you could invoke them multiple times. From Version 2.0.0 on, the client maintains a single queue that holds all incoming messages, accessible via `Client.messages`.

If you continue to need multiple queues (e.g. because you have special concurrency requirements), you can build a "distributor" on top:

```python
import asyncio
import aiomqtt


async def temperature_consumer():
while True:
message = await temperature_queue.get()
print(f"[temperature/#] {message.payload}")


async def humidity_consumer():
while True:
message = await humidity_queue.get()
print(f"[humidity/#] {message.payload}")


temperature_queue = asyncio.Queue()
humidity_queue = asyncio.Queue()


async def distributor(client):
# Sort messages into the appropriate queues
async for message in client.messages:
if message.topic.matches("temperature/#"):
temperature_queue.put_nowait(message)
elif message.topic.matches("humidity/#"):
humidity_queue.put_nowait(message)


async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
# Use a task group to manage and await all tasks
async with asyncio.TaskGroup() as tg:
tg.create_task(distributor(client))
tg.create_task(temperature_consumer())
tg.create_task(humidity_consumer())


asyncio.run(main())
```

## Changes to client arguments

- The `queue_class` and `queue_maxsize` arguments to `filtered_messages`, `unfiltered_messages`, and `messages` have been moved to the `Client` and have been renamed to `queue_type` and `max_queued_incoming_messages`
- The `max_queued_messages` client argument has been renamed to `max_queued_outgoing_messages`
- The deprecated `message_retry_set` client argument has been removed
7 changes: 3 additions & 4 deletions _sources/reconnection.md.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ async def main():
while True:
try:
async with client:
async with client.messages() as messages:
await client.subscribe("humidity/#")
async for message in messages:
print(message.payload)
await client.subscribe("humidity/#")
async for message in client.messages:
print(message.payload)
except aiomqtt.MqttError:
print(f"Connection lost; Reconnecting in {interval} seconds ...")
await asyncio.sleep(interval)
Expand Down
Loading

0 comments on commit 3ee3b0e

Please sign in to comment.