diff --git a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java index 08b07d7e5..dd038c998 100644 --- a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java @@ -145,9 +145,7 @@ public void testWillMessageIsFiredOnClientKeepAliveExpiry() throws Exception { @Test public void testRejectConnectWithEmptyClientID() throws InterruptedException { LOG.info("*** testRejectConnectWithEmptyClientID ***"); - m_client.clientId("").connect(); - - this.receivedMsg = this.m_client.lastReceivedMessage(); + this.receivedMsg = m_client.clientId("").connect(); assertTrue(receivedMsg instanceof MqttConnAckMessage); MqttConnAckMessage connAck = (MqttConnAckMessage) receivedMsg; diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java index 8dd989d01..13da64aa0 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java @@ -46,12 +46,12 @@ public void tearDown() throws Exception { super.tearDown(); } - void connectLowLevel() { + void connectLowLevel() throws InterruptedException { MqttConnAckMessage connAck = lowLevelClient.connectV5(); assertConnectionAccepted(connAck, "Connection must be accepted"); } - void connectLowLevel(int keepAliveSecs) { + void connectLowLevel(int keepAliveSecs) throws InterruptedException { MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs, BrokerConstants.INFLIGHT_WINDOW_SIZE); assertConnectionAccepted(connAck, "Connection must be accepted"); } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java index 12b3479c1..961948dfe 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java @@ -57,7 +57,7 @@ public void testAckResponseProperties() { } @Test - public void testAssignedClientIdentifier() { + public void testAssignedClientIdentifier() throws InterruptedException { Client unnamedClient = new Client("localhost").clientId(""); connAck = unnamedClient.connectV5(); assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Client connected"); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java index d65d2cc39..3509b1309 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java @@ -71,7 +71,7 @@ public void simpleConnect() { } @Test - public void sendConnectOnDisconnectedConnection() { + public void sendConnectOnDisconnectedConnection() throws InterruptedException { MqttConnAckMessage connAck = lowLevelClient.connectV5(); TestUtils.assertConnectionAccepted(connAck, "Connection must be accepted"); lowLevelClient.disconnect(); @@ -85,7 +85,7 @@ public void sendConnectOnDisconnectedConnection() { } @Test - public void receiveInflightPublishesAfterAReconnect() { + public void receiveInflightPublishesAfterAReconnect() throws InterruptedException { final Mqtt5BlockingClient publisher = MqttClient.builder() .useMqttVersion5() .identifier("publisher") diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java index 1535cad90..eaafb8525 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java @@ -27,13 +27,10 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.*; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.concurrent.TimeUnit; import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; @@ -43,11 +40,8 @@ public class FlowControlTest extends AbstractServerIntegrationTest { - private static final Logger LOG = LoggerFactory.getLogger(FlowControlTest.class); - @Test public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnected() throws IOException, InterruptedException { - LOG.info("givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnected"); final int serverSendQuota = 5; // stop existing broker to restart with receiveMaximum configured @@ -78,7 +72,7 @@ public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnect // sixth should exceed quota and the client should get a disconnect sendQoS2Publish(); - MqttMessage receivedMsg = lowLevelClient.lastReceivedMessage(); + MqttMessage receivedMsg = lowLevelClient.receiveNextMessage(Duration.ofMillis(500)); assertEquals(MqttMessageType.DISCONNECT, receivedMsg.fixedHeader().messageType(), "On sixth in flight message the send quota is exhausted and response should be DISCONNECT"); MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMsg.variableHeader(); @@ -88,8 +82,8 @@ public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnect assertTrue(lowLevelClient.isConnectionLost(), "Connection MUST be closed by the server"); } - private void verifyReceived(MqttMessageType expectedMessageType) { - MqttMessage receivedMsg = lowLevelClient.lastReceivedMessage(); + private void verifyReceived(MqttMessageType expectedMessageType) throws InterruptedException { + MqttMessage receivedMsg = lowLevelClient.receiveNextMessage(Duration.ofMillis(500)); assertEquals(expectedMessageType, receivedMsg.fixedHeader().messageType()); } @@ -99,7 +93,7 @@ private void sendQoS2Publish() { MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, MqttProperties.NO_PROPERTIES); ByteBuf payload = Unpooled.wrappedBuffer("18°C".getBytes(StandardCharsets.UTF_8)); MqttPublishMessage publishQoS2 = new MqttPublishMessage(fixedHeader, variableHeader, payload); - lowLevelClient.publish(publishQoS2, 500, TimeUnit.MILLISECONDS); + lowLevelClient.publish(publishQoS2); } @Override @@ -110,7 +104,6 @@ public String clientName() { @Test public void givenClientConnectedWithCertainReceiveMaximumWhenInFlightSizeIsSurpassedThenTheServerEnqueueAndDontFloodTheClient() throws InterruptedException { - LOG.info("givenClientConnectedWithCertainReceiveMaximumWhenInFlightSizeIsSurpassedThenTheServerEnqueueAndDontFloodTheClient"); connectLowLevel(); // subscribe with an identifier @@ -158,7 +151,6 @@ private static void fillInFlightWindow(int numPublishToSend, Mqtt5BlockingClient @Test public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectlyTheFullListOfPendingMessagesWithoutAnyLose() throws InterruptedException { - LOG.info("givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectlyTheFullListOfPendingMessagesWithoutAnyLose"); // connect subscriber and published // publisher send 20 events, 10 should be in the inflight, 10 remains on the queue connectLowLevel(); @@ -167,7 +159,6 @@ public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectl MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", MqttQoS.AT_LEAST_ONCE, 123); - LOG.info("\n\n\n\n"); verifyOfType(received, MqttMessageType.SUBACK); //lowlevel client doesn't ACK any pub, so the in flight window fills up diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java index c65bdde50..314e8c288 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java @@ -153,7 +153,7 @@ public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThe // subscribe with an identifier MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", - MqttQoS.AT_LEAST_ONCE, 123, 500, TimeUnit.MILLISECONDS); + MqttQoS.AT_LEAST_ONCE, 123, Duration.ofMillis(500)); verifyOfType(received, MqttMessageType.SUBACK); //lowlevel client doesn't ACK any pub, so the in flight window fills up diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java index ff0b7b405..2a09bf0d7 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java @@ -23,8 +23,12 @@ import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode; import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode; +import io.moquette.logging.LoggingUtils; import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; @@ -43,16 +47,21 @@ import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.concurrent.TimeUnit; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.assertj.core.api.Assertions.assertThat; public class PayloadFormatIndicatorTest extends AbstractServerIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(PayloadFormatIndicatorTest.class); // second octet is invalid public static final byte[] INVALID_UTF_8_BYTES = new byte[]{(byte) 0xC3, 0x28}; @@ -64,11 +73,14 @@ public String clientName() { @Test public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException { + LOG.info("givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent TEST START"); Mqtt5BlockingClient subscriber = createSubscriberClient(); - subscriber.subscribeWith() + Mqtt5SubAck subAck = subscriber.subscribeWith() .topicFilter("temperature/living") .qos(MqttQos.AT_MOST_ONCE) .send(); + assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_0); + LOG.info("SUBACK received"); Mqtt5BlockingClient publisher = createPublisherClient(); publisher.publishWith() @@ -77,6 +89,7 @@ public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThen .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) .qos(MqttQos.AT_MOST_ONCE) .send(); + LOG.info("PUB QoS0 sent"); verifyPublishMessage(subscriber, msgPub -> { assertTrue(msgPub.getPayloadFormatIndicator().isPresent()); @@ -168,10 +181,10 @@ public void givenNotValidUTF8StringInPublishQoS0WhenPayloadFormatIndicatorIsSetT MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, props); MqttPublishMessage publishQoS0 = new MqttPublishMessage(fixedHeader, variableHeader, Unpooled.wrappedBuffer(INVALID_UTF_8_BYTES)); // in a reasonable amount of time (say 500 ms) it should receive a DISCONNECT - lowLevelClient.publish(publishQoS0, 500, TimeUnit.MILLISECONDS); + lowLevelClient.publish(publishQoS0); // Verify a DISCONNECT is received with PAYLOAD_FORMAT_INVALID reason code and connection is closed - final MqttMessage receivedMessage = lowLevelClient.lastReceivedMessage(); + final MqttMessage receivedMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(500)); assertEquals(MqttMessageType.DISCONNECT, receivedMessage.fixedHeader().messageType()); MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMessage.variableHeader(); assertEquals(MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID.byteValue(), disconnectHeader.reasonCode(), diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java index e0997816c..b9313cfc0 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java @@ -45,7 +45,7 @@ public String clientName() { } @Test - public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisconnected() { + public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisconnected() throws InterruptedException { connectLowLevel(); MqttMessage received = lowLevelClient.subscribeWithError("$share/+/measures/temp", MqttQoS.AT_LEAST_ONCE); @@ -57,7 +57,7 @@ public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisco } @Test - public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptionACK() { + public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptionACK() throws InterruptedException { connectLowLevel(); MqttMessage received = lowLevelClient.subscribeWithError("$share/metrics/measures/temp", MqttQoS.AT_LEAST_ONCE); @@ -70,7 +70,7 @@ public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptio } @Test - public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWithNegativeResponse() throws IOException { + public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWithNegativeResponse() throws IOException, InterruptedException { // stop already started broker instance stopServer(); @@ -93,7 +93,7 @@ public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWit @Test - public void givenClientSubscribingToSharedAndNonSharedWhenTheSharedIsNotReadableReceivesPositiveAckOnlyForNonShared() throws IOException { + public void givenClientSubscribingToSharedAndNonSharedWhenTheSharedIsNotReadableReceivesPositiveAckOnlyForNonShared() throws IOException, InterruptedException { // stop already started broker instance stopServer(); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java index 6176df14d..88e59749b 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java @@ -29,7 +29,7 @@ public void givenNonSharedSubscriptionWithIdentifierWhenPublishMatchedThenReceiv // subscribe with an identifier MqttMessage received = lowLevelClient.subscribeWithIdentifier("/metrics/measures/temp", - MqttQoS.AT_LEAST_ONCE, 123, 400, TimeUnit.MILLISECONDS); + MqttQoS.AT_LEAST_ONCE, 123, Duration.ofMillis(400)); verifyOfType(received, MqttMessageType.SUBACK); Mqtt5BlockingClient publisher = createPublisherClient(); diff --git a/broker/src/test/java/io/moquette/testclient/Client.java b/broker/src/test/java/io/moquette/testclient/Client.java index bd2ccf712..5a2e45dfc 100644 --- a/broker/src/test/java/io/moquette/testclient/Client.java +++ b/broker/src/test/java/io/moquette/testclient/Client.java @@ -31,7 +31,6 @@ import java.time.Duration; import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -58,7 +57,7 @@ public interface ICallback { EventLoopGroup workerGroup; Channel m_channel; private boolean m_connectionLost; - private volatile ICallback callback; +// private volatile ICallback callback; private String clientId; private AtomicReference receivedMsg = new AtomicReference<>(); private final BlockingQueue receivedMessages = new LinkedBlockingQueue<>(); @@ -101,7 +100,7 @@ public Client clientId(String clientId) { return this; } - public void connect(String willTestamentTopic, String willTestamentMsg) { + public void connect(String willTestamentTopic, String willTestamentMsg) throws InterruptedException { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader( MqttMessageType.CONNECT, false, @@ -132,24 +131,24 @@ public void connect(String willTestamentTopic, String willTestamentMsg) { doConnect(connectMessage); } - public void connect() { + public MqttConnAckMessage connect() throws InterruptedException { MqttConnectMessage connectMessage = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_3_1_1) .clientId("").keepAlive(2) // secs .willFlag(false).willQoS(MqttQoS.AT_MOST_ONCE).build(); - doConnect(connectMessage); + return doConnect(connectMessage); } - public MqttConnAckMessage connectV5() { + public MqttConnAckMessage connectV5() throws InterruptedException { return connectV5(2, BrokerConstants.INFLIGHT_WINDOW_SIZE); } - public MqttConnAckMessage connectV5WithReceiveMaximum(int receiveMaximumInflight) { + public MqttConnAckMessage connectV5WithReceiveMaximum(int receiveMaximumInflight) throws InterruptedException { return connectV5(2, receiveMaximumInflight); } @NotNull - public MqttConnAckMessage connectV5(int keepAliveSecs, int receiveMaximumInflight) { + public MqttConnAckMessage connectV5(int keepAliveSecs, int receiveMaximumInflight) throws InterruptedException { final MqttMessageBuilders.ConnectBuilder builder = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_5); if (clientId != null) { builder.clientId(clientId); @@ -171,33 +170,14 @@ public MqttConnAckMessage connectV5(int keepAliveSecs, int receiveMaximumInfligh return doConnect(connectMessage); } - private MqttConnAckMessage doConnect(MqttConnectMessage connectMessage) { - final CountDownLatch latch = new CountDownLatch(1); - LOG.info("Callback set by CONNECT"); - this.setCallback(msg -> { - receivedMsg.getAndSet(msg); - LOG.info("Connect callback invocation, received message {}", msg.fixedHeader().messageType()); - latch.countDown(); - - // clear the callback - LOG.info("Callback set null by CONNACK"); - setCallback(null); - }); + private MqttConnAckMessage doConnect(MqttConnectMessage connectMessage) throws InterruptedException { this.sendMessage(connectMessage); - boolean waitElapsed; - try { - waitElapsed = !latch.await(2_000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting", e); - } - - if (waitElapsed) { + final MqttMessage connAckMessage = this.receiveNextMessage(Duration.ofMillis(2_000)); + if (connAckMessage == null) { throw new RuntimeException("Cannot receive ConnAck in 2 s"); } - - final MqttMessage connAckMessage = this.receivedMsg.get(); if (!(connAckMessage instanceof MqttConnAckMessage)) { MqttMessageType messageType = connAckMessage.fixedHeader().messageType(); throw new RuntimeException("Expected a CONN_ACK message but received " + messageType); @@ -205,32 +185,32 @@ private MqttConnAckMessage doConnect(MqttConnectMessage connectMessage) { return (MqttConnAckMessage) connAckMessage; } - public MqttSubAckMessage subscribe(String topic1, MqttQoS qos1, String topic2, MqttQoS qos2) { + public MqttSubAckMessage subscribe(String topic1, MqttQoS qos1, String topic2, MqttQoS qos2) throws InterruptedException { final MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe() .messageId(1) .addSubscription(qos1, topic1) .addSubscription(qos2, topic2) .build(); - return doSubscribeWithAckCasting(subscribeMessage, TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); + return doSubscribeWithAckCasting(subscribeMessage, TIMEOUT_DURATION); } - public MqttSubAckMessage subscribe(String topic, MqttQoS qos) { + public MqttSubAckMessage subscribe(String topic, MqttQoS qos) throws InterruptedException { final MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe() .messageId(1) .addSubscription(qos, topic) .build(); - return doSubscribeWithAckCasting(subscribeMessage, TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); + return doSubscribeWithAckCasting(subscribeMessage, TIMEOUT_DURATION); } - public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int subscriptionIdentifier) { - return subscribeWithIdentifier(topic, qos, subscriptionIdentifier, TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); + public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int subscriptionIdentifier) throws InterruptedException { + return subscribeWithIdentifier(topic, qos, subscriptionIdentifier, TIMEOUT_DURATION); } @NotNull public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int subscriptionIdentifier, - long timeout, TimeUnit timeUnit) { + Duration timeout) throws InterruptedException { MqttProperties subProps = new MqttProperties(); subProps.add(new MqttProperties.IntegerProperty( MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value(), @@ -242,14 +222,14 @@ public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int .properties(subProps) .build(); - return doSubscribeWithAckCasting(subscribeMessage, timeout, timeUnit); + return doSubscribeWithAckCasting(subscribeMessage, timeout); } @NotNull - private MqttSubAckMessage doSubscribeWithAckCasting(MqttSubscribeMessage subscribeMessage, long timeout, TimeUnit timeUnit) { - doSubscribe(subscribeMessage, timeout, timeUnit); + private MqttSubAckMessage doSubscribeWithAckCasting(MqttSubscribeMessage subscribeMessage, Duration timeout) throws InterruptedException { + doSubscribe(subscribeMessage); - final MqttMessage subAckMessage = this.receivedMsg.get(); + final MqttMessage subAckMessage = this.receiveNextMessage(timeout); if (!(subAckMessage instanceof MqttSubAckMessage)) { MqttMessageType messageType = subAckMessage.fixedHeader().messageType(); throw new RuntimeException("Expected a SUB_ACK message but received " + messageType); @@ -257,62 +237,16 @@ private MqttSubAckMessage doSubscribeWithAckCasting(MqttSubscribeMessage subscri return (MqttSubAckMessage) subAckMessage; } - private void doSubscribe(MqttSubscribeMessage subscribeMessage, long timeout, TimeUnit timeUnit) { - final CountDownLatch subscribeAckLatch = new CountDownLatch(1); - LOG.info("Callback set by SUBSCRIBE"); - this.setCallback(msg -> { - receivedMsg.getAndSet(msg); - LOG.debug("Subscribe callback invocation, received message {}", msg.fixedHeader().messageType()); - subscribeAckLatch.countDown(); - - // clear the callback - LOG.info("Callback set null by SUBACK"); - setCallback(null); - }); - + private void doSubscribe(MqttSubscribeMessage subscribeMessage) { LOG.debug("Sending SUBSCRIBE message"); sendMessage(subscribeMessage); LOG.debug("Sent SUBSCRIBE message"); - - boolean waitElapsed; - try { - waitElapsed = !subscribeAckLatch.await(timeout, timeUnit); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting", e); - } - - if (waitElapsed) { - throw new RuntimeException("Cannot receive SubscribeAck in " + timeout + " " + timeUnit); - } } - public void publish(MqttPublishMessage publishMessage, int timeout, TimeUnit timeUnit) { - final CountDownLatch publishResponseLatch = new CountDownLatch(1); - LOG.info("Callback set by PUBLISH"); - this.setCallback(msg -> { - receivedMsg.getAndSet(msg); - LOG.debug("Publish callback invocation, received message {}", msg.fixedHeader().messageType()); - publishResponseLatch.countDown(); - - // clear the callback - LOG.info("Callback set null by PUBLISH"); - setCallback(null); - }); - + public void publish(MqttPublishMessage publishMessage) { LOG.debug("Sending PUBLISH message"); sendMessage(publishMessage); LOG.debug("Sent PUBLISH message"); - - boolean notExpired; - try { - notExpired = publishResponseLatch.await(timeout, timeUnit); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting", e); - } - - if (! notExpired) { - throw new RuntimeException("Cannot receive any message after PUBLISH in " + timeout + " " + timeUnit); - } } public MqttMessage subscribeWithError(String topic, MqttQoS qos) { @@ -321,8 +255,16 @@ public MqttMessage subscribeWithError(String topic, MqttQoS qos) { .addSubscription(qos, topic) .build(); - doSubscribe(subscribeMessage, TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); - return this.receivedMsg.get(); + doSubscribe(subscribeMessage); + try { + MqttMessage mqttMessage = this.receiveNextMessage(TIMEOUT_DURATION); + if (mqttMessage == null) { + throw new RuntimeException("Cannot receive SubscribeAck in " + TIMEOUT_DURATION); + } + return mqttMessage; + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting", e); + } } public void disconnect() { @@ -334,9 +276,9 @@ public void shutdownConnection() throws InterruptedException { this.workerGroup.shutdownGracefully().sync(); } - public void setCallback(ICallback callback) { - this.callback = callback; - } +// public void setCallback(ICallback callback) { +// this.callback = callback; +// } public void sendMessage(MqttMessage msg) { m_channel.writeAndFlush(msg).addListener(FIRE_EXCEPTION_ON_FAILURE); @@ -348,12 +290,12 @@ public MqttMessage lastReceivedMessage() { void messageReceived(MqttMessage msg) { LOG.info("Received message {}", msg); - LOG.debug("Callback is {}", callback); - if (this.callback != null) { - this.callback.call(msg); - } else { +// LOG.debug("Callback is {}", callback); +// if (this.callback != null) { +// this.callback.call(msg); +// } else { receivedMessages.add(msg); - } +// } } public boolean hasReceivedMessages() { diff --git a/broker/src/test/resources/log4j.properties b/broker/src/test/resources/log4j.properties index abd39d987..90c3b0a10 100644 --- a/broker/src/test/resources/log4j.properties +++ b/broker/src/test/resources/log4j.properties @@ -7,9 +7,9 @@ log4j.logger.io.moquette=WARN #log4j.logger.io.moquette.broker.SessionRegistry=DEBUG #log4j.logger.io.moquette.broker.PostOffice=DEBUG #log4j.logger.io.moquette.broker=WARN -log4j.logger.io.moquette.broker.subscriptions.CTrieSpeedTest=INFO -log4j.logger.io.moquette.integration.ServerIntegrationRestartTest=INFO -log4j.logger.io.moquette.integration.mqtt5.FlowControlTest=INFO +log4j.logger.io.moquette.integration.mqtt5.PayloadFormatIndicatorTest=INFO +#log4j.logger.io.moquette.integration.ServerIntegrationRestartTest=INFO +#log4j.logger.io.moquette.integration.mqtt5.FlowControlTest=INFO log4j.logger.io.moquette.testclient.Client=DEBUG log4j.logger.BufferManagement=TRACE