Skip to content

Commit

Permalink
[Test] add integration test to prove that message expiry works for en…
Browse files Browse the repository at this point in the history
…queued messages
  • Loading branch information
andsel committed Mar 24, 2024
1 parent 7cf57eb commit 2976f26
Showing 1 changed file with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static io.moquette.integration.mqtt5.ConnectTest.verifyNoPublish;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static org.junit.jupiter.api.Assertions.*;

public class MessageExpirationTest extends AbstractServerIntegrationTest {
Expand Down Expand Up @@ -99,4 +99,64 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimeIsNotExpiredAndSubsc
assertNotNull(messageExpiryProperty, "message expiry property must be present");
assertTrue(messageExpiryProperty.value() < messageExpiryInterval, "Forwarded message expiry should be lowered");
}

@Test
public void givenPublishedMessageWithExpiryWhenMessageRemainInBrokerForMoreThanTheExipiryIsNotPublished() throws InterruptedException {
connectLowLevel();

// subscribe with an identifier
MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living",
MqttQoS.AT_LEAST_ONCE, 123);
verifyOfType(received, MqttMessageType.SUBACK);

//lowlevel client doesn't ACK any pub, so the in flight window fills up
Mqtt5BlockingClient publisher = createPublisherClient();
int inflightWindowSize = 10;
int messageExpiryInterval = 2; // seconds
// fill the in flight window so that messages starts to be enqueued
fillInFlightWindow(inflightWindowSize, publisher, messageExpiryInterval);

// send another message, which is enqueued and has an expiry of messageExpiryInterval seconds
publisher.publishWith()
.topic("temperature/living")
.payload(("Enqueued").getBytes(StandardCharsets.UTF_8))
.qos(MqttQos.AT_LEAST_ONCE) // Broker enqueues only QoS1 and QoS2
.messageExpiryInterval(messageExpiryInterval)
.send();

// let time flow so that the message in queue passes its expiry time
Thread.sleep(Duration.ofSeconds(messageExpiryInterval + 1).toMillis());

// now subscriber consumes messages, shouldn't receive any message in the form "Enqueued-"
consumesPublishesInflifhtWindow(inflightWindowSize);

MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(100));
assertNull(mqttMessage, "No other messages MUST be received after consuming the in flight window");
}

private void consumesPublishesInflifhtWindow(int inflightWindowSize) throws InterruptedException {
for (int i = 0; i < inflightWindowSize; i++) {
MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(200));
assertTrue(mqttMessage instanceof MqttPublishMessage);
MqttPublishMessage publish = (MqttPublishMessage) mqttMessage;
assertEquals(Integer.toString(i), publish.payload().toString(StandardCharsets.UTF_8));
int packetId = publish.variableHeader().packetId();

MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE,
false, 0);
MqttPubAckMessage pubAck = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(packetId));
lowLevelClient.sendMessage(pubAck);
}
}

private static void fillInFlightWindow(int inflightWindowSize, Mqtt5BlockingClient publisher, int messageExpiryInterval) {
for (int i = 0; i < inflightWindowSize; i++) {
publisher.publishWith()
.topic("temperature/living")
.payload(Integer.toString(i).getBytes(StandardCharsets.UTF_8))
.qos(MqttQos.AT_LEAST_ONCE) // Broker enqueues only QoS1 and QoS2
.messageExpiryInterval(messageExpiryInterval)
.send();
}
}
}

0 comments on commit 2976f26

Please sign in to comment.