diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index b1049a7da..84a12d710 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -559,6 +559,7 @@ public void close() { // Update all not clean session with the proper expiry date updateNotCleanSessionsWithProperExpire(); queueRepository.close(); + pool.values().forEach(Session::cleanUp); } private void updateNotCleanSessionsWithProperExpire() { diff --git a/broker/src/main/java/io/moquette/broker/metrics/MQTTMessageLogger.java b/broker/src/main/java/io/moquette/broker/metrics/MQTTMessageLogger.java index 3c390716f..405eff490 100644 --- a/broker/src/main/java/io/moquette/broker/metrics/MQTTMessageLogger.java +++ b/broker/src/main/java/io/moquette/broker/metrics/MQTTMessageLogger.java @@ -100,7 +100,9 @@ private void logMQTTMessage(ChannelHandlerContext ctx, Object message, String di break; case PUBLISH: MqttPublishMessage publish = (MqttPublishMessage) msg; - LOG.debug("{} PUBLISH <{}> to topics <{}>", direction, clientID, publish.variableHeader().topicName()); + LOG.debug("{} PUBLISH <{}> to topics <{}> qos {} packetId <{}>", direction, clientID, + publish.variableHeader().topicName(), publish.fixedHeader().qosLevel(), publish.variableHeader().packetId() + ); break; case PUBREC: case PUBCOMP: diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java index c65bdde50..79af76160 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java @@ -112,6 +112,9 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimeIsNotExpiredAndSubsc .getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value()); assertNotNull(messageExpiryProperty, "message expiry property must be present"); assertTrue(messageExpiryProperty.value() < messageExpiryInterval, "Forwarded message expiry should be lowered"); + + // send PUBACK to release + acknowledge(publish.variableHeader().packetId()); assertTrue(publish.release(), "Last reference of publish should be released"); } diff --git a/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java b/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java index 0c0fb6290..05aedf6c2 100644 --- a/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java +++ b/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java @@ -50,6 +50,9 @@ public String getID() { public void onPublish(InterceptPublishMessage msg) { final String decodedPayload = msg.getPayload().toString(UTF_8); System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload); + + // super handle the message buffer count + super.onPublish(msg); } @Override