Skip to content

Commit

Permalink
[Test] Rewritten test to avoid raw client's callback
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Nov 23, 2024
1 parent 4f59932 commit b5278ea
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ public void testWillMessageIsFiredOnClientKeepAliveExpiry() throws Exception {
@Test
public void testRejectConnectWithEmptyClientID() throws InterruptedException {
LOG.info("*** testRejectConnectWithEmptyClientID ***");
m_client.clientId("").connect();

this.receivedMsg = this.m_client.lastReceivedMessage();
this.receivedMsg = m_client.clientId("").connect();

assertTrue(receivedMsg instanceof MqttConnAckMessage);
MqttConnAckMessage connAck = (MqttConnAckMessage) receivedMsg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public void tearDown() throws Exception {
super.tearDown();
}

void connectLowLevel() {
void connectLowLevel() throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
assertConnectionAccepted(connAck, "Connection must be accepted");
}

void connectLowLevel(int keepAliveSecs) {
void connectLowLevel(int keepAliveSecs) throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs, BrokerConstants.INFLIGHT_WINDOW_SIZE);
assertConnectionAccepted(connAck, "Connection must be accepted");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testAckResponseProperties() {
}

@Test
public void testAssignedClientIdentifier() {
public void testAssignedClientIdentifier() throws InterruptedException {
Client unnamedClient = new Client("localhost").clientId("");
connAck = unnamedClient.connectV5();
assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Client connected");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void simpleConnect() {
}

@Test
public void sendConnectOnDisconnectedConnection() {
public void sendConnectOnDisconnectedConnection() throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
TestUtils.assertConnectionAccepted(connAck, "Connection must be accepted");
lowLevelClient.disconnect();
Expand All @@ -85,7 +85,7 @@ public void sendConnectOnDisconnectedConnection() {
}

@Test
public void receiveInflightPublishesAfterAReconnect() {
public void receiveInflightPublishesAfterAReconnect() throws InterruptedException {
final Mqtt5BlockingClient publisher = MqttClient.builder()
.useMqttVersion5()
.identifier("publisher")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;
Expand All @@ -43,11 +40,8 @@

public class FlowControlTest extends AbstractServerIntegrationTest {

private static final Logger LOG = LoggerFactory.getLogger(FlowControlTest.class);

@Test
public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnected() throws IOException, InterruptedException {
LOG.info("givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnected");
final int serverSendQuota = 5;

// stop existing broker to restart with receiveMaximum configured
Expand Down Expand Up @@ -78,7 +72,7 @@ public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnect
// sixth should exceed quota and the client should get a disconnect
sendQoS2Publish();

MqttMessage receivedMsg = lowLevelClient.lastReceivedMessage();
MqttMessage receivedMsg = lowLevelClient.receiveNextMessage(Duration.ofMillis(500));
assertEquals(MqttMessageType.DISCONNECT, receivedMsg.fixedHeader().messageType(),
"On sixth in flight message the send quota is exhausted and response should be DISCONNECT");
MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMsg.variableHeader();
Expand All @@ -88,8 +82,8 @@ public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnect
assertTrue(lowLevelClient.isConnectionLost(), "Connection MUST be closed by the server");
}

private void verifyReceived(MqttMessageType expectedMessageType) {
MqttMessage receivedMsg = lowLevelClient.lastReceivedMessage();
private void verifyReceived(MqttMessageType expectedMessageType) throws InterruptedException {
MqttMessage receivedMsg = lowLevelClient.receiveNextMessage(Duration.ofMillis(500));
assertEquals(expectedMessageType, receivedMsg.fixedHeader().messageType());
}

Expand All @@ -99,7 +93,7 @@ private void sendQoS2Publish() {
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, MqttProperties.NO_PROPERTIES);
ByteBuf payload = Unpooled.wrappedBuffer("18°C".getBytes(StandardCharsets.UTF_8));
MqttPublishMessage publishQoS2 = new MqttPublishMessage(fixedHeader, variableHeader, payload);
lowLevelClient.publish(publishQoS2, 500, TimeUnit.MILLISECONDS);
lowLevelClient.publish(publishQoS2);
}

@Override
Expand All @@ -110,7 +104,6 @@ public String clientName() {

@Test
public void givenClientConnectedWithCertainReceiveMaximumWhenInFlightSizeIsSurpassedThenTheServerEnqueueAndDontFloodTheClient() throws InterruptedException {
LOG.info("givenClientConnectedWithCertainReceiveMaximumWhenInFlightSizeIsSurpassedThenTheServerEnqueueAndDontFloodTheClient");
connectLowLevel();

// subscribe with an identifier
Expand Down Expand Up @@ -158,7 +151,6 @@ private static void fillInFlightWindow(int numPublishToSend, Mqtt5BlockingClient

@Test
public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectlyTheFullListOfPendingMessagesWithoutAnyLose() throws InterruptedException {
LOG.info("givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectlyTheFullListOfPendingMessagesWithoutAnyLose");
// connect subscriber and published
// publisher send 20 events, 10 should be in the inflight, 10 remains on the queue
connectLowLevel();
Expand All @@ -167,7 +159,6 @@ public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectl
MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living",
MqttQoS.AT_LEAST_ONCE, 123);

LOG.info("\n\n\n\n");
verifyOfType(received, MqttMessageType.SUBACK);

//lowlevel client doesn't ACK any pub, so the in flight window fills up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThe

// subscribe with an identifier
MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living",
MqttQoS.AT_LEAST_ONCE, 123, 500, TimeUnit.MILLISECONDS);
MqttQoS.AT_LEAST_ONCE, 123, Duration.ofMillis(500));
verifyOfType(received, MqttMessageType.SUBACK);

//lowlevel client doesn't ACK any pub, so the in flight window fills up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import io.moquette.logging.LoggingUtils;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
Expand All @@ -43,16 +47,21 @@
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.assertj.core.api.Assertions.assertThat;

public class PayloadFormatIndicatorTest extends AbstractServerIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(PayloadFormatIndicatorTest.class);

// second octet is invalid
public static final byte[] INVALID_UTF_8_BYTES = new byte[]{(byte) 0xC3, 0x28};
Expand All @@ -64,11 +73,14 @@ public String clientName() {

@Test
public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException {
LOG.info("givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent TEST START");
Mqtt5BlockingClient subscriber = createSubscriberClient();
subscriber.subscribeWith()
Mqtt5SubAck subAck = subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.send();
assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_0);
LOG.info("SUBACK received");

Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
Expand All @@ -77,6 +89,7 @@ public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThen
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
.qos(MqttQos.AT_MOST_ONCE)
.send();
LOG.info("PUB QoS0 sent");

verifyPublishMessage(subscriber, msgPub -> {
assertTrue(msgPub.getPayloadFormatIndicator().isPresent());
Expand Down Expand Up @@ -168,10 +181,10 @@ public void givenNotValidUTF8StringInPublishQoS0WhenPayloadFormatIndicatorIsSetT
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, props);
MqttPublishMessage publishQoS0 = new MqttPublishMessage(fixedHeader, variableHeader, Unpooled.wrappedBuffer(INVALID_UTF_8_BYTES));
// in a reasonable amount of time (say 500 ms) it should receive a DISCONNECT
lowLevelClient.publish(publishQoS0, 500, TimeUnit.MILLISECONDS);
lowLevelClient.publish(publishQoS0);

// Verify a DISCONNECT is received with PAYLOAD_FORMAT_INVALID reason code and connection is closed
final MqttMessage receivedMessage = lowLevelClient.lastReceivedMessage();
final MqttMessage receivedMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(500));
assertEquals(MqttMessageType.DISCONNECT, receivedMessage.fixedHeader().messageType());
MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMessage.variableHeader();
assertEquals(MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID.byteValue(), disconnectHeader.reasonCode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public String clientName() {
}

@Test
public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisconnected() {
public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisconnected() throws InterruptedException {
connectLowLevel();

MqttMessage received = lowLevelClient.subscribeWithError("$share/+/measures/temp", MqttQoS.AT_LEAST_ONCE);
Expand All @@ -57,7 +57,7 @@ public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisco
}

@Test
public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptionACK() {
public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptionACK() throws InterruptedException {
connectLowLevel();

MqttMessage received = lowLevelClient.subscribeWithError("$share/metrics/measures/temp", MqttQoS.AT_LEAST_ONCE);
Expand All @@ -70,7 +70,7 @@ public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptio
}

@Test
public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWithNegativeResponse() throws IOException {
public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWithNegativeResponse() throws IOException, InterruptedException {
// stop already started broker instance
stopServer();

Expand All @@ -93,7 +93,7 @@ public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWit


@Test
public void givenClientSubscribingToSharedAndNonSharedWhenTheSharedIsNotReadableReceivesPositiveAckOnlyForNonShared() throws IOException {
public void givenClientSubscribingToSharedAndNonSharedWhenTheSharedIsNotReadableReceivesPositiveAckOnlyForNonShared() throws IOException, InterruptedException {
// stop already started broker instance
stopServer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void givenNonSharedSubscriptionWithIdentifierWhenPublishMatchedThenReceiv

// subscribe with an identifier
MqttMessage received = lowLevelClient.subscribeWithIdentifier("/metrics/measures/temp",
MqttQoS.AT_LEAST_ONCE, 123, 400, TimeUnit.MILLISECONDS);
MqttQoS.AT_LEAST_ONCE, 123, Duration.ofMillis(400));
verifyOfType(received, MqttMessageType.SUBACK);

Mqtt5BlockingClient publisher = createPublisherClient();
Expand Down
Loading

0 comments on commit b5278ea

Please sign in to comment.