Skip to content

Commit

Permalink
Rewritten retained verify test to use Paho becuase HiveMQ client does…
Browse files Browse the repository at this point in the history
…n't forward the publish
  • Loading branch information
andsel committed Apr 20, 2024
1 parent 8d00d30 commit b07a8c9
Showing 1 changed file with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -57,7 +64,7 @@ public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThen
}

@Test
public void givenAPublishWithPayloadFormatIndicatorRetainedWhenForwardedToSubscriberThenIsPresent() throws InterruptedException {
public void givenAPublishWithPayloadFormatIndicatorRetainedWhenForwardedToSubscriberThenIsPresent() throws InterruptedException, MqttException {
Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
Expand All @@ -67,14 +74,20 @@ public void givenAPublishWithPayloadFormatIndicatorRetainedWhenForwardedToSubscr
.qos(MqttQos.AT_LEAST_ONCE) // retained works for QoS > 0
.send();

Mqtt5BlockingClient subscriber = createSubscriberClient();
subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_LEAST_ONCE)
.send();
MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence());
client.connect();
MqttSubscription subscription = new MqttSubscription("temperature/living", 1);
SubscriptionOptionsTest.PublishCollector publishCollector = new SubscriptionOptionsTest.PublishCollector();
IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription},
new IMqttMessageListener[] {publishCollector});
TestUtils.verifySubscribedSuccessfully(subscribeToken);

verifyPublishMessage(subscriber, msgPub -> {
assertTrue(msgPub.getPayloadFormatIndicator().isPresent());
});
// Verify the message is also reflected back to the sender
publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS);
assertEquals("temperature/living", publishCollector.receivedTopic());
assertEquals("18", publishCollector.receivedPayload(), "Payload published on topic should match");
org.eclipse.paho.mqttv5.common.MqttMessage receivedMessage = publishCollector.receivedMessage();
assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), receivedMessage.getQos());
assertTrue(receivedMessage.getProperties().getPayloadFormat());
}
}

0 comments on commit b07a8c9

Please sign in to comment.