diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java index 14ab71198..c65bdde50 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java @@ -22,12 +22,9 @@ import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; -import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttProperties; -import io.netty.handler.codec.mqtt.MqttPubAckMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import org.awaitility.Awaitility; @@ -43,8 +40,10 @@ import java.time.Duration; import java.util.concurrent.TimeUnit; -import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class MessageExpirationTest extends AbstractServerIntegrationTest { @Override diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java index 1eafab0ad..161a1d3a5 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java @@ -26,15 +26,21 @@ import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class RequestResponseTest extends AbstractServerIntegrationWithoutClientFixture { @@ -60,7 +66,7 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo .topicFilter("requester/door/open") .qos(MqttQos.AT_LEAST_ONCE) .build(); - responder.toAsync().subscribe(subscribeToRequest, + CompletableFuture<@NotNull Mqtt5SubAck> subackFuture = responder.toAsync().subscribe(subscribeToRequest, (Mqtt5Publish pub) -> { assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish"); Mqtt5PublishResult responseResult = responder.publishWith() @@ -74,6 +80,9 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo "Open door response cannot be published "); }); + // wait for the SUBACK in 1 second, else if PUB is sent before the client is fully subscribed, then it's lost + waitForSubAck(subackFuture); + Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith() .topic("requester/door/open") .responseTopic(responseTopic) @@ -104,7 +113,7 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW .topicFilter("requester/door/open") .qos(MqttQos.AT_LEAST_ONCE) .build(); - responder.toAsync().subscribe(subscribeToRequest, + CompletableFuture<@NotNull Mqtt5SubAck> subackFuture = responder.toAsync().subscribe(subscribeToRequest, (Mqtt5Publish pub) -> { assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish"); assertTrue(pub.getCorrelationData().isPresent(), "Correlation data MUST defined in request publish"); @@ -115,6 +124,7 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW .send(); assertFalse(responseResult.getError().isPresent(), "Open door response cannot be published "); }); + waitForSubAck(subackFuture); Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith() .topic("requester/door/open") @@ -136,6 +146,20 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW }); } + private static void waitForSubAck(CompletableFuture<@NotNull Mqtt5SubAck> subackFuture) { + try { + Mqtt5SubAck mqtt5SubAck = subackFuture.get(1, TimeUnit.SECONDS); + assertEquals(1, mqtt5SubAck.getReasonCodes().size()); + assertEquals(Mqtt5SubAckReasonCode.GRANTED_QOS_1, mqtt5SubAck.getReasonCodes().iterator().next()); + } catch (InterruptedException e) { + fail("Sub ack waiting interrupted before 1 sec expires"); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + fail("Sub ack didn't arrive in 1 second timeout"); + } + } + private byte[] asByteArray(ByteBuffer byteBuffer) { byte[] arr = new byte[byteBuffer.remaining()]; byteBuffer.get(arr); diff --git a/broker/src/test/java/io/moquette/testclient/Client.java b/broker/src/test/java/io/moquette/testclient/Client.java index b406bcf38..e9bba018e 100644 --- a/broker/src/test/java/io/moquette/testclient/Client.java +++ b/broker/src/test/java/io/moquette/testclient/Client.java @@ -51,7 +51,7 @@ public interface ICallback { private static final Logger LOG = LoggerFactory.getLogger(Client.class); - private static final Duration TIMEOUT_DURATION = Duration.ofMillis(300); + private static final Duration TIMEOUT_DURATION = Duration.ofMillis(500); final ClientNettyMQTTHandler handler = new ClientNettyMQTTHandler(); EventLoopGroup workerGroup;