Skip to content

Commit

Permalink
Fixed publish verification to tests, to subscribe to listened action …
Browse files Browse the repository at this point in the history
…before triggering the action
  • Loading branch information
andsel committed Nov 23, 2024
1 parent 254a845 commit 9319da3
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,35 +148,39 @@ protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer<V
}
}

protected static void verifyPublishedMessage(Mqtt5BlockingClient client, Consumer<Void> action, MqttQos expectedQos,
protected static void verifyNoPublish(Mqtt5BlockingClient.Mqtt5Publishes publishes, Consumer<Void> action, Duration timeout, String message) throws InterruptedException {
action.accept(null);
Optional<Mqtt5Publish> publishedMessage = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS);

// verify no published will in 10 seconds
assertFalse(publishedMessage.isPresent(), message);
}

protected static void verifyPublishedMessage(Mqtt5BlockingClient.Mqtt5Publishes publishes, Consumer<Void> action, MqttQos expectedQos,
String expectedPayload, String errorMessage, int timeoutSeconds) throws Exception {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
action.accept(null);
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals(expectedPayload, payload, errorMessage);
assertEquals(expectedQos, msgPub.getQos());
action.accept(null);
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals(expectedPayload, payload, errorMessage);
assertEquals(expectedQos, msgPub.getQos());
}

static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType) {
assertEquals(mqttMessageType, received.fixedHeader().messageType());
}

static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer<Mqtt5Publish> assertion) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt5Publish> publishMessage = publishes.receive(1, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
assertion.accept(msgPub);
static void verifyPublishMessage(Mqtt5BlockingClient.Mqtt5Publishes publishListener, Consumer<Mqtt5Publish> assertion) throws InterruptedException {
Optional<Mqtt5Publish> publishMessage = publishListener.receive(1, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
assertion.accept(msgPub);
}
}
72 changes: 39 additions & 33 deletions broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,30 +180,32 @@ public void avoidToFirePreviouslyScheduledWillWhenSameClientIDReconnects() throw
final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();
try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) {

// client trigger a will message, disconnecting with bad reason code
final Mqtt5Disconnect malformedPacketReason = Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.MALFORMED_PACKET)
.build();
clientWithWill.disconnect(malformedPacketReason);
// client trigger a will message, disconnecting with bad reason code
final Mqtt5Disconnect malformedPacketReason = Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.MALFORMED_PACKET)
.build();
clientWithWill.disconnect(malformedPacketReason);

// wait no will is published
verifyNoTestamentIsPublished(testamentSubscriber, unused -> {
// reconnect another client with same clientId
final Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier(clientId)
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
Mqtt5ConnAck connectAck = client.connect();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Client connected");

}, Duration.ofSeconds(10));
// wait no will is published
verifyNoTestamentIsPublished(testamentListener, unused -> {
// reconnect another client with same clientId
final Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier(clientId)
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
Mqtt5ConnAck connectAck = client.connect();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Client connected");

}, Duration.ofSeconds(10));
}
}

private static void verifyNoTestamentIsPublished(Mqtt5BlockingClient testamentSubscriber, Consumer<Void> action, Duration timeout) throws InterruptedException {
verifyNoPublish(testamentSubscriber, action, timeout, "No will message should be published");
private static void verifyNoTestamentIsPublished(Mqtt5BlockingClient.Mqtt5Publishes testamentListener, Consumer<Void> action, Duration timeout) throws InterruptedException {
verifyNoPublish(testamentListener, action, timeout, "No will message should be published");
}

@Test
Expand All @@ -230,12 +232,14 @@ public void noWillMessageIsFiredOnNormalDisconnection() throws InterruptedExcept
final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId, 10, 60);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();
try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) {

// wait no will is published
verifyNoTestamentIsPublished(testamentSubscriber, unused -> {
// normal session disconnection
clientWithWill.disconnect(Mqtt5Disconnect.builder().build());
}, Duration.ofSeconds(10));
// wait no will is published
verifyNoTestamentIsPublished(testamentListener, unused -> {
// normal session disconnection
clientWithWill.disconnect(Mqtt5Disconnect.builder().build());
}, Duration.ofSeconds(10));
}
}

@Test
Expand All @@ -245,14 +249,16 @@ public void givenClientWithWillThatCleanlyDisconnectsWithWillShouldTriggerTheTes
final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId, 10, 60);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();

// wait no will is published
verifyNoTestamentIsPublished(testamentSubscriber, unused -> {
// normal session disconnection with will
clientWithWill.disconnect(Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE)
.build());
}, Duration.ofSeconds(10));
try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) {

// wait no will is published
verifyNoTestamentIsPublished(testamentListener, unused -> {
// normal session disconnection with will
clientWithWill.disconnect(Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE)
.build());
}, Duration.ofSeconds(10));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
Expand Down Expand Up @@ -65,17 +66,19 @@ public void givenAPublishWithContentTypeWhenForwardedToSubscriberThenIsPresent()
.send();

Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
.payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8))
.contentType("application/json")
.qos(MqttQos.AT_MOST_ONCE)
.send();
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
publisher.publishWith()
.topic("temperature/living")
.payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8))
.contentType("application/json")
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyPublishMessage(subscriber, msgPub -> {
assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present");
assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched");
});
verifyPublishMessage(publishes, msgPub -> {
assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present");
assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched");
});
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
Expand Down Expand Up @@ -68,13 +69,16 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimePassedThenRetainedIs

// subscribe to same topic and verify no message
Mqtt5BlockingClient subscriber = createSubscriberClient();
subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyNoPublish(subscriber, v -> {}, Duration.ofSeconds(2),
"Subscriber must not receive any retained message");
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyNoPublish(publishes, v -> {
}, Duration.ofSeconds(2),
"Subscriber must not receive any retained message");
}
}

// TODO verify the elapsed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
Expand Down Expand Up @@ -52,13 +53,15 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReply(

final Mqtt5BlockingClient responder = createHiveBlockingClient("responder");

responderRepliesToRequesterPublish(responder, requester, responseTopic);
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = requester.publishes(MqttGlobalPublishFilter.ALL)) {
responderRepliesToRequesterPublish(responder, requester, responseTopic);

verifyPublishMessage(requester, msgPub -> {
assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present");
String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals("OK", payload);
});
verifyPublishMessage(publishes, msgPub -> {
assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present");
String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals("OK", payload);
});
}
}

private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient responder, Mqtt5BlockingClient requester, String responseTopic) {
Expand Down Expand Up @@ -126,24 +129,26 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW
});
waitForSubAck(subackFuture);

Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith()
.topic("requester/door/open")
.responseTopic(responseTopic)
.correlationData("req-open-door".getBytes(StandardCharsets.UTF_8))
.payload("Please open the door".getBytes(StandardCharsets.UTF_8))
.qos(MqttQos.AT_LEAST_ONCE)
.send();
assertEquals(Mqtt5PubAckReasonCode.SUCCESS, requestResult.getPubAck().getReasonCode(),
"Open door request cannot be published ");

verifyPublishMessage(requester, msgPub -> {
assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present");
String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals("OK", payload);
assertTrue(msgPub.getCorrelationData().isPresent(), "Request correlation data MUST defined in response publish");
final byte[] correlationData = asByteArray(msgPub.getCorrelationData().get());
assertEquals("req-open-door", new String(correlationData, StandardCharsets.UTF_8));
});
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = requester.publishes(MqttGlobalPublishFilter.ALL)) {
Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith()
.topic("requester/door/open")
.responseTopic(responseTopic)
.correlationData("req-open-door".getBytes(StandardCharsets.UTF_8))
.payload("Please open the door".getBytes(StandardCharsets.UTF_8))
.qos(MqttQos.AT_LEAST_ONCE)
.send();
assertEquals(Mqtt5PubAckReasonCode.SUCCESS, requestResult.getPubAck().getReasonCode(),
"Open door request cannot be published ");

verifyPublishMessage(publishes, msgPub -> {
assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present");
String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals("OK", payload);
assertTrue(msgPub.getCorrelationData().isPresent(), "Request correlation data MUST defined in response publish");
final byte[] correlationData = asByteArray(msgPub.getCorrelationData().get());
assertEquals("req-open-door", new String(correlationData, StandardCharsets.UTF_8));
});
}
}

private static void waitForSubAck(CompletableFuture<@NotNull Mqtt5SubAck> subackFuture) {
Expand Down Expand Up @@ -174,12 +179,14 @@ public void givenRequestResponseProtocolAndClientIsConnectedWhenRequestIsIssueTh

final Mqtt5BlockingClient responder = createHiveBlockingClient("responder");

responderRepliesToRequesterPublish(responder, requester, responseTopic);
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = requester.publishes(MqttGlobalPublishFilter.ALL)) {
responderRepliesToRequesterPublish(responder, requester, responseTopic);

verifyPublishMessage(requester, msgPub -> {
assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present");
String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals("OK", payload);
});
verifyPublishMessage(publishes, msgPub -> {
assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present");
String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals("OK", payload);
});
}
}
}
Loading

0 comments on commit 9319da3

Please sign in to comment.