Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/resolve flaky error in tests #872

Merged
merged 10 commits into from
Nov 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ public void testWillMessageIsFiredOnClientKeepAliveExpiry() throws Exception {
@Test
public void testRejectConnectWithEmptyClientID() throws InterruptedException {
LOG.info("*** testRejectConnectWithEmptyClientID ***");
m_client.clientId("").connect();

this.receivedMsg = this.m_client.lastReceivedMessage();
this.receivedMsg = m_client.clientId("").connect();

assertTrue(receivedMsg instanceof MqttConnAckMessage);
MqttConnAckMessage connAck = (MqttConnAckMessage) receivedMsg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public void tearDown() throws Exception {
super.tearDown();
}

void connectLowLevel() {
void connectLowLevel() throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
assertConnectionAccepted(connAck, "Connection must be accepted");
}

void connectLowLevel(int keepAliveSecs) {
void connectLowLevel(int keepAliveSecs) throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs, BrokerConstants.INFLIGHT_WINDOW_SIZE);
assertConnectionAccepted(connAck, "Connection must be accepted");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testAckResponseProperties() {
}

@Test
public void testAssignedClientIdentifier() {
public void testAssignedClientIdentifier() throws InterruptedException {
Client unnamedClient = new Client("localhost").clientId("");
connAck = unnamedClient.connectV5();
assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Client connected");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void simpleConnect() {
}

@Test
public void sendConnectOnDisconnectedConnection() {
public void sendConnectOnDisconnectedConnection() throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
TestUtils.assertConnectionAccepted(connAck, "Connection must be accepted");
lowLevelClient.disconnect();
Expand All @@ -85,7 +85,7 @@ public void sendConnectOnDisconnectedConnection() {
}

@Test
public void receiveInflightPublishesAfterAReconnect() {
public void receiveInflightPublishesAfterAReconnect() throws InterruptedException {
final Mqtt5BlockingClient publisher = MqttClient.builder()
.useMqttVersion5()
.identifier("publisher")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;
Expand Down Expand Up @@ -73,7 +72,7 @@ public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnect
// sixth should exceed quota and the client should get a disconnect
sendQoS2Publish();

MqttMessage receivedMsg = lowLevelClient.lastReceivedMessage();
MqttMessage receivedMsg = lowLevelClient.receiveNextMessage(Duration.ofMillis(500));
assertEquals(MqttMessageType.DISCONNECT, receivedMsg.fixedHeader().messageType(),
"On sixth in flight message the send quota is exhausted and response should be DISCONNECT");
MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMsg.variableHeader();
Expand All @@ -83,8 +82,8 @@ public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnect
assertTrue(lowLevelClient.isConnectionLost(), "Connection MUST be closed by the server");
}

private void verifyReceived(MqttMessageType expectedMessageType) {
MqttMessage receivedMsg = lowLevelClient.lastReceivedMessage();
private void verifyReceived(MqttMessageType expectedMessageType) throws InterruptedException {
MqttMessage receivedMsg = lowLevelClient.receiveNextMessage(Duration.ofMillis(500));
assertEquals(expectedMessageType, receivedMsg.fixedHeader().messageType());
}

Expand All @@ -94,7 +93,7 @@ private void sendQoS2Publish() {
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, MqttProperties.NO_PROPERTIES);
ByteBuf payload = Unpooled.wrappedBuffer("18°C".getBytes(StandardCharsets.UTF_8));
MqttPublishMessage publishQoS2 = new MqttPublishMessage(fixedHeader, variableHeader, payload);
lowLevelClient.publish(publishQoS2, 500, TimeUnit.MILLISECONDS);
lowLevelClient.publish(publishQoS2);
}

@Override
Expand Down Expand Up @@ -159,6 +158,7 @@ public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectl
// subscribe with an identifier
MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living",
MqttQoS.AT_LEAST_ONCE, 123);

verifyOfType(received, MqttMessageType.SUBACK);

//lowlevel client doesn't ACK any pub, so the in flight window fills up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThe

// subscribe with an identifier
MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living",
MqttQoS.AT_LEAST_ONCE, 123, 500, TimeUnit.MILLISECONDS);
MqttQoS.AT_LEAST_ONCE, 123, Duration.ofMillis(500));
verifyOfType(received, MqttMessageType.SUBACK);

//lowlevel client doesn't ACK any pub, so the in flight window fills up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import io.moquette.logging.LoggingUtils;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
Expand All @@ -43,16 +47,21 @@
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.assertj.core.api.Assertions.assertThat;

public class PayloadFormatIndicatorTest extends AbstractServerIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(PayloadFormatIndicatorTest.class);

// second octet is invalid
public static final byte[] INVALID_UTF_8_BYTES = new byte[]{(byte) 0xC3, 0x28};
Expand All @@ -64,19 +73,23 @@ public String clientName() {

@Test
public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException {
LOG.info("givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent TEST START");
Mqtt5BlockingClient subscriber = createSubscriberClient();
subscriber.subscribeWith()
Mqtt5SubAck subAck = subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.qos(MqttQos.AT_LEAST_ONCE)
.send();
assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1);
LOG.info("SUBACK received");

Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
.payload("18".getBytes(StandardCharsets.UTF_8))
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
.qos(MqttQos.AT_MOST_ONCE)
.qos(MqttQos.AT_LEAST_ONCE)
andsel marked this conversation as resolved.
Show resolved Hide resolved
.send();
LOG.info("PUB QoS1 sent");

verifyPublishMessage(subscriber, msgPub -> {
assertTrue(msgPub.getPayloadFormatIndicator().isPresent());
Expand Down Expand Up @@ -168,10 +181,10 @@ public void givenNotValidUTF8StringInPublishQoS0WhenPayloadFormatIndicatorIsSetT
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, props);
MqttPublishMessage publishQoS0 = new MqttPublishMessage(fixedHeader, variableHeader, Unpooled.wrappedBuffer(INVALID_UTF_8_BYTES));
// in a reasonable amount of time (say 500 ms) it should receive a DISCONNECT
lowLevelClient.publish(publishQoS0, 500, TimeUnit.MILLISECONDS);
lowLevelClient.publish(publishQoS0);

// Verify a DISCONNECT is received with PAYLOAD_FORMAT_INVALID reason code and connection is closed
final MqttMessage receivedMessage = lowLevelClient.lastReceivedMessage();
final MqttMessage receivedMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(500));
assertEquals(MqttMessageType.DISCONNECT, receivedMessage.fixedHeader().messageType());
MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMessage.variableHeader();
assertEquals(MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID.byteValue(), disconnectHeader.reasonCode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public String clientName() {
}

@Test
public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisconnected() {
public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisconnected() throws InterruptedException {
LOG.info("givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisconnected START");
connectLowLevel();

MqttMessage received = lowLevelClient.subscribeWithError("$share/+/measures/temp", MqttQoS.AT_LEAST_ONCE);
Expand All @@ -57,7 +58,8 @@ public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisco
}

@Test
public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptionACK() {
public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptionACK() throws InterruptedException {
LOG.info("givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptionACK START");
connectLowLevel();

MqttMessage received = lowLevelClient.subscribeWithError("$share/metrics/measures/temp", MqttQoS.AT_LEAST_ONCE);
Expand All @@ -70,7 +72,8 @@ public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptio
}

@Test
public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWithNegativeResponse() throws IOException {
public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWithNegativeResponse() throws IOException, InterruptedException {
LOG.info("givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWithNegativeResponse START");
// stop already started broker instance
stopServer();

Expand All @@ -93,7 +96,8 @@ public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWit


@Test
public void givenClientSubscribingToSharedAndNonSharedWhenTheSharedIsNotReadableReceivesPositiveAckOnlyForNonShared() throws IOException {
public void givenClientSubscribingToSharedAndNonSharedWhenTheSharedIsNotReadableReceivesPositiveAckOnlyForNonShared() throws IOException, InterruptedException {
LOG.info("givenClientSubscribingToSharedAndNonSharedWhenTheSharedIsNotReadableReceivesPositiveAckOnlyForNonShared START");
// stop already started broker instance
stopServer();

Expand Down Expand Up @@ -131,6 +135,7 @@ protected void startServer(String dbPath, IAuthorizatorPolicy authPolicy) throws

@Test
public void givenASharedSubscriptionClientReceivesANotification() throws Exception {
LOG.info("givenASharedSubscriptionClientReceivesANotification START");
final Mqtt5BlockingClient subscriberClient = createSubscriberClient();
subscriberClient.subscribeWith()
.topicFilter("$share/collectors/metric/temperature/#")
Expand All @@ -146,6 +151,7 @@ public void givenASharedSubscriptionClientReceivesANotification() throws Excepti

@Test
public void givenAClientWithOverlappingSharedSubscriptionsThenReceivesMultiplePublishes() throws InterruptedException {
LOG.info("givenAClientWithOverlappingSharedSubscriptionsThenReceivesMultiplePublishes START");
// Connect a subscriber client
lowLevelClient = new Client("localhost").clientId(clientName());

Expand Down Expand Up @@ -184,6 +190,7 @@ private static void verifyPubPayload(MqttMessage received, String expectedPayloa

@Test
public void whenAClientSubscribeToASharedTopicThenDoesntReceiveAnyRetainedMessagedOnTheMatchingTopicFilter() throws InterruptedException {
LOG.info("whenAClientSubscribeToASharedTopicThenDoesntReceiveAnyRetainedMessagedOnTheMatchingTopicFilter START");
// publish a message with retained on a shared topic
Mqtt5BlockingClient publisherClient = createPublisherClient();
publisherClient.publishWith()
Expand Down Expand Up @@ -214,6 +221,7 @@ public void whenAClientSubscribeToASharedTopicThenDoesntReceiveAnyRetainedMessag

@Test
public void givenSharedSubscriptionWithCertainQoSWhenSameClientWithSameShareSubscribeToSameTopicFilterThenQoSUpdates() throws Exception {
LOG.info("givenSharedSubscriptionWithCertainQoSWhenSameClientWithSameShareSubscribeToSameTopicFilterThenQoSUpdates START");
final Mqtt5BlockingClient subscriberClient = createSubscriberClient();
subscribe(subscriberClient, "$share/collectors/metric/temperature/living", MqttQos.AT_MOST_ONCE);

Expand Down Expand Up @@ -250,6 +258,7 @@ static void subscribe(Mqtt5BlockingClient subscriberClient, String topicFilter,

@Test
public void givenMultipleClientSubscribedToSharedSubscriptionWhenOneUnsubscribeThenTheSharedSubscriptionRemainsValid() throws Exception {
LOG.info("givenMultipleClientSubscribedToSharedSubscriptionWhenOneUnsubscribeThenTheSharedSubscriptionRemainsValid START");
String fullSharedSubscriptionTopicFilter = "$share/collectors/metric/temperature/living";

// subscribe first client to shared subscription
Expand Down Expand Up @@ -283,6 +292,7 @@ public void givenMultipleClientSubscribedToSharedSubscriptionWhenOneUnsubscribeT

@Test
public void givenASharedSubscriptionWhenLastSubscribedClientUnsubscribeThenTheSharedSubscriptionCeasesToExist() throws Exception {
LOG.info("givenASharedSubscriptionWhenLastSubscribedClientUnsubscribeThenTheSharedSubscriptionCeasesToExist START");
String fullSharedSubscriptionTopicFilter = "$share/collectors/metric/temperature/living";

// subscribe client to shared subscription
Expand Down Expand Up @@ -313,6 +323,7 @@ public void givenASharedSubscriptionWhenLastSubscribedClientUnsubscribeThenTheSh

@Test
public void givenASharedSubscriptionWhenLastSubscribedClientSessionTerminatesThenTheSharedSubscriptionCeasesToExist() throws Exception {
LOG.info("givenASharedSubscriptionWhenLastSubscribedClientSessionTerminatesThenTheSharedSubscriptionCeasesToExist START");
String fullSharedSubscriptionTopicFilter = "$share/collectors/metric/temperature/living";

// subscribe client to shared subscription
Expand All @@ -338,6 +349,7 @@ public void givenASharedSubscriptionWhenLastSubscribedClientSessionTerminatesThe

@Test
public void givenASharedSubscriptionWhenBrokerRestartsAndClientReconnectsThenSharedSubscriptionIsReloaded() throws Exception {
LOG.info("givenASharedSubscriptionWhenBrokerRestartsAndClientReconnectsThenSharedSubscriptionIsReloaded START");
String fullSharedSubscriptionTopicFilter = "$share/collectors/metric/temperature/living";

// subscribe client to shared subscription
Expand Down
Loading
Loading