diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorAndContentTypeTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorAndContentTypeTest.java index a3f382377..d9dec36a9 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorAndContentTypeTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorAndContentTypeTest.java @@ -22,9 +22,16 @@ import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import org.eclipse.paho.mqttv5.client.IMqttMessageListener; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static org.junit.jupiter.api.Assertions.*; @@ -57,7 +64,7 @@ public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThen } @Test - public void givenAPublishWithPayloadFormatIndicatorRetainedWhenForwardedToSubscriberThenIsPresent() throws InterruptedException { + public void givenAPublishWithPayloadFormatIndicatorRetainedWhenForwardedToSubscriberThenIsPresent() throws InterruptedException, MqttException { Mqtt5BlockingClient publisher = createPublisherClient(); publisher.publishWith() .topic("temperature/living") @@ -67,14 +74,20 @@ public void givenAPublishWithPayloadFormatIndicatorRetainedWhenForwardedToSubscr .qos(MqttQos.AT_LEAST_ONCE) // retained works for QoS > 0 .send(); - Mqtt5BlockingClient subscriber = createSubscriberClient(); - subscriber.subscribeWith() - .topicFilter("temperature/living") - .qos(MqttQos.AT_LEAST_ONCE) - .send(); + MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence()); + client.connect(); + MqttSubscription subscription = new MqttSubscription("temperature/living", 1); + SubscriptionOptionsTest.PublishCollector publishCollector = new SubscriptionOptionsTest.PublishCollector(); + IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription}, + new IMqttMessageListener[] {publishCollector}); + TestUtils.verifySubscribedSuccessfully(subscribeToken); - verifyPublishMessage(subscriber, msgPub -> { - assertTrue(msgPub.getPayloadFormatIndicator().isPresent()); - }); + // Verify the message is also reflected back to the sender + publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS); + assertEquals("temperature/living", publishCollector.receivedTopic()); + assertEquals("18", publishCollector.receivedPayload(), "Payload published on topic should match"); + org.eclipse.paho.mqttv5.common.MqttMessage receivedMessage = publishCollector.receivedMessage(); + assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), receivedMessage.getQos()); + assertTrue(receivedMessage.getProperties().getPayloadFormat()); } }