Skip to content

Commit

Permalink
Merge pull request #169 from quartiq/respond-cd
Browse files Browse the repository at this point in the history
pub: respond default topic
  • Loading branch information
jordens authored Jan 22, 2025
2 parents 677203e + a03f48a commit aace7d1
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This document describes the changes to Minimq between releases.
## Changed
* The `Publication::finish()` API was removed in favor of a new `Publication::respond()` API for
constructing replies to previously received messages.
* `DeferredPublication` has been removed: pass a `FnOnce(&mut [u8])` as payload.
* [breaking] `embedded-nal` bumped. Now `core::net::SocketAddr` and related ip types are used.
MSRV becomes 1.77.0.

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ mod will;
pub use broker::Broker;
pub use config::ConfigBuilder;
pub use properties::Property;
pub use publication::{DeferredPublication, Publication};
pub use publication::Publication;
pub use reason_codes::ReasonCode;
pub use will::Will;

Expand Down
37 changes: 8 additions & 29 deletions src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub trait ToPayload {
fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error>;
}

impl<'a> ToPayload for &'a [u8] {
impl ToPayload for &[u8] {
type Error = ();

fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
Expand All @@ -21,7 +21,7 @@ impl<'a> ToPayload for &'a [u8] {
}
}

impl<'a> ToPayload for &'a str {
impl ToPayload for &str {
type Error = ();

fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
Expand All @@ -37,33 +37,10 @@ impl<const N: usize> ToPayload for &[u8; N] {
}
}

/// A publication where the payload is serialized directly into the transmission buffer in the
/// future.
///
/// # Note
/// This is "deferred" because the closure will only be called once the publication is actually
/// sent.
pub struct DeferredPublication<F> {
func: F,
}

impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> DeferredPublication<F> {
pub fn new<'a>(topic: &'a str, func: F) -> Publication<'a, Self> {
Publication::new(topic, Self { func })
}

pub fn respond<'a>(
received_properties: &'a Properties<'a>,
func: F,
) -> Result<Publication<'a, Self>, ProtocolError> {
Publication::respond(received_properties, Self { func })
}
}

impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> ToPayload for DeferredPublication<F> {
impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> ToPayload for F {
type Error = E;
fn serialize(self, buffer: &mut [u8]) -> Result<usize, E> {
(self.func)(buffer)
self(buffer)
}
}

Expand All @@ -87,7 +64,7 @@ pub struct Publication<'a, P> {
pub(crate) retain: Retain,
}

impl<'a, P: ToPayload> Publication<'a, P> {
impl<'a, P> Publication<'a, P> {
/// Generate the publication as a reply to some other received message.
///
/// # Note
Expand All @@ -98,8 +75,9 @@ impl<'a, P: ToPayload> Publication<'a, P> {
/// publication properties.
///
/// * If a response topic is identified, the message topic will be
/// configured for it, which will override any previously-specified topic.
/// configured for it, which will override the default topic.
pub fn respond(
default_topic: Option<&'a str>,
received_properties: &'a Properties<'a>,
payload: P,
) -> Result<Self, ProtocolError> {
Expand All @@ -113,6 +91,7 @@ impl<'a, P: ToPayload> Publication<'a, P> {
None
}
})
.or(default_topic)
.ok_or(ProtocolError::NoTopic)?;

let publication = Self::new(response_topic, payload);
Expand Down
4 changes: 2 additions & 2 deletions tests/deferred_payload.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use minimq::{DeferredPublication, Minimq, QoS};
use minimq::{Minimq, Publication, QoS};

use core::net::{IpAddr, Ipv4Addr};
use std_embedded_time::StandardClock;
Expand All @@ -23,7 +23,7 @@ fn main() -> std::io::Result<()> {

assert!(matches!(
mqtt.client().publish(
DeferredPublication::new("data", |_buf| { Err("Oops!") }).qos(QoS::ExactlyOnce)
Publication::new("data", |_buf: &mut [u8]| Err("Oops!")).qos(QoS::ExactlyOnce)
),
Err(minimq::PubError::Serialization("Oops!"))
));
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn main() -> std::io::Result<()> {
.poll(|client, topic, payload, properties| {
log::info!("{} < {}", topic, core::str::from_utf8(payload).unwrap());

if let Ok(response) = Publication::respond(properties, b"Pong") {
if let Ok(response) = Publication::respond(None, properties, b"Pong") {
client.publish(response).unwrap();
}

Expand Down

0 comments on commit aace7d1

Please sign in to comment.