diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java index a14595634..1d92e1fe7 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java @@ -72,14 +72,21 @@ public String clientName() { } @Test - public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException { + public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException, MqttException { LOG.info("givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent TEST START"); - Mqtt5BlockingClient subscriber = createSubscriberClient(); - Mqtt5SubAck subAck = subscriber.subscribeWith() - .topicFilter("temperature/living") - .qos(MqttQos.AT_LEAST_ONCE) - .send(); - assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1); +// Mqtt5BlockingClient subscriber = createSubscriberClient(); +// Mqtt5SubAck subAck = subscriber.subscribeWith() +// .topicFilter("temperature/living") +// .qos(MqttQos.AT_LEAST_ONCE) +// .send(); +// assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1); + MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence()); + client.connect(); + MqttSubscription subscription = new MqttSubscription("temperature/living", 1); + SubscriptionOptionsTest.PublishCollector publishCollector = new SubscriptionOptionsTest.PublishCollector(); + IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription}, + new IMqttMessageListener[] {publishCollector}); + TestUtils.verifySubscribedSuccessfully(subscribeToken); LOG.info("SUBACK received"); Mqtt5BlockingClient publisher = createPublisherClient(); @@ -91,9 +98,16 @@ public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThen .send(); LOG.info("PUB QoS1 sent"); - verifyPublishMessage(subscriber, msgPub -> { - assertTrue(msgPub.getPayloadFormatIndicator().isPresent()); - }); +// verifyPublishMessage(subscriber, msgPub -> { +// assertTrue(msgPub.getPayloadFormatIndicator().isPresent()); +// }); + // Verify the message is also reflected back to the sender + publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS); + assertEquals("temperature/living", publishCollector.receivedTopic()); + assertEquals("18", publishCollector.receivedPayload(), "Payload published on topic should match"); + org.eclipse.paho.mqttv5.common.MqttMessage receivedMessage = publishCollector.receivedMessage(); + assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), receivedMessage.getQos()); + assertTrue(receivedMessage.getProperties().getPayloadFormat()); } @Test