Skip to content

Commit

Permalink
Swithed subscriber in test from Hive to Paho, becuase Hive despite re…
Browse files Browse the repository at this point in the history
…ceives the PUB doesn't forward imemdiately
  • Loading branch information
andsel committed Nov 23, 2024
1 parent 27f8699 commit 75a6734
Showing 1 changed file with 24 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,21 @@ public String clientName() {
}

@Test
public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException {
public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException, MqttException {
LOG.info("givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent TEST START");
Mqtt5BlockingClient subscriber = createSubscriberClient();
Mqtt5SubAck subAck = subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_LEAST_ONCE)
.send();
assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1);
// Mqtt5BlockingClient subscriber = createSubscriberClient();
// Mqtt5SubAck subAck = subscriber.subscribeWith()
// .topicFilter("temperature/living")
// .qos(MqttQos.AT_LEAST_ONCE)
// .send();
// assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1);
MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence());
client.connect();
MqttSubscription subscription = new MqttSubscription("temperature/living", 1);
SubscriptionOptionsTest.PublishCollector publishCollector = new SubscriptionOptionsTest.PublishCollector();
IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription},
new IMqttMessageListener[] {publishCollector});
TestUtils.verifySubscribedSuccessfully(subscribeToken);
LOG.info("SUBACK received");

Mqtt5BlockingClient publisher = createPublisherClient();
Expand All @@ -91,9 +98,16 @@ public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThen
.send();
LOG.info("PUB QoS1 sent");

verifyPublishMessage(subscriber, msgPub -> {
assertTrue(msgPub.getPayloadFormatIndicator().isPresent());
});
// verifyPublishMessage(subscriber, msgPub -> {
// assertTrue(msgPub.getPayloadFormatIndicator().isPresent());
// });
// Verify the message is also reflected back to the sender
publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS);
assertEquals("temperature/living", publishCollector.receivedTopic());
assertEquals("18", publishCollector.receivedPayload(), "Payload published on topic should match");
org.eclipse.paho.mqttv5.common.MqttMessage receivedMessage = publishCollector.receivedMessage();
assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), receivedMessage.getQos());
assertTrue(receivedMessage.getProperties().getPayloadFormat());
}

@Test
Expand Down

0 comments on commit 75a6734

Please sign in to comment.