Skip to content

Commit

Permalink
Fixed request-response tests (#870)
Browse files Browse the repository at this point in the history
Updates the test fixture so that the published after the subscribe wait for completions (SUBACK receive) of it.
  • Loading branch information
andsel authored Nov 17, 2024
1 parent 82a54aa commit c076193
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.awaitility.Awaitility;
Expand All @@ -43,8 +40,10 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class MessageExpirationTest extends AbstractServerIntegrationTest {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class RequestResponseTest extends AbstractServerIntegrationWithoutClientFixture {

Expand All @@ -60,7 +66,7 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo
.topicFilter("requester/door/open")
.qos(MqttQos.AT_LEAST_ONCE)
.build();
responder.toAsync().subscribe(subscribeToRequest,
CompletableFuture<@NotNull Mqtt5SubAck> subackFuture = responder.toAsync().subscribe(subscribeToRequest,
(Mqtt5Publish pub) -> {
assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish");
Mqtt5PublishResult responseResult = responder.publishWith()
Expand All @@ -74,6 +80,9 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo
"Open door response cannot be published ");
});

// wait for the SUBACK in 1 second, else if PUB is sent before the client is fully subscribed, then it's lost
waitForSubAck(subackFuture);

Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith()
.topic("requester/door/open")
.responseTopic(responseTopic)
Expand Down Expand Up @@ -104,7 +113,7 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW
.topicFilter("requester/door/open")
.qos(MqttQos.AT_LEAST_ONCE)
.build();
responder.toAsync().subscribe(subscribeToRequest,
CompletableFuture<@NotNull Mqtt5SubAck> subackFuture = responder.toAsync().subscribe(subscribeToRequest,
(Mqtt5Publish pub) -> {
assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish");
assertTrue(pub.getCorrelationData().isPresent(), "Correlation data MUST defined in request publish");
Expand All @@ -115,6 +124,7 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW
.send();
assertFalse(responseResult.getError().isPresent(), "Open door response cannot be published ");
});
waitForSubAck(subackFuture);

Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith()
.topic("requester/door/open")
Expand All @@ -136,6 +146,20 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW
});
}

private static void waitForSubAck(CompletableFuture<@NotNull Mqtt5SubAck> subackFuture) {
try {
Mqtt5SubAck mqtt5SubAck = subackFuture.get(1, TimeUnit.SECONDS);
assertEquals(1, mqtt5SubAck.getReasonCodes().size());
assertEquals(Mqtt5SubAckReasonCode.GRANTED_QOS_1, mqtt5SubAck.getReasonCodes().iterator().next());
} catch (InterruptedException e) {
fail("Sub ack waiting interrupted before 1 sec expires");
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
fail("Sub ack didn't arrive in 1 second timeout");
}
}

private byte[] asByteArray(ByteBuffer byteBuffer) {
byte[] arr = new byte[byteBuffer.remaining()];
byteBuffer.get(arr);
Expand Down
2 changes: 1 addition & 1 deletion broker/src/test/java/io/moquette/testclient/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public interface ICallback {

private static final Logger LOG = LoggerFactory.getLogger(Client.class);

private static final Duration TIMEOUT_DURATION = Duration.ofMillis(300);
private static final Duration TIMEOUT_DURATION = Duration.ofMillis(500);

final ClientNettyMQTTHandler handler = new ClientNettyMQTTHandler();
EventLoopGroup workerGroup;
Expand Down

0 comments on commit c076193

Please sign in to comment.