Skip to content

Commit

Permalink
Implemented ContentType property forward on publish receive
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed May 25, 2024
1 parent c68f37f commit 0f2b22e
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 0 deletions.
35 changes: 35 additions & 0 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,18 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, String username, i
}
}

if (isContentTypeToValidate(msg)) {
if (!validateContentTypeAsUTF8(msg)) {
LOG.warn("Received not valid UTF-8 content type (QoS1)");
ReferenceCountUtil.release(msg);
connection.brokerDisconnect(MqttReasonCodes.Disconnect.PROTOCOL_ERROR);
connection.disconnectSession();
connection.dropConnection();

return RoutingResults.preroutingError();
}
}

final RoutingResults routes;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
Expand Down Expand Up @@ -1053,6 +1065,29 @@ private static boolean isPayloadFormatToValidate(MqttPublishMessage msg) {
return false;
}

private static boolean isContentTypeToValidate(MqttPublishMessage msg) {
MqttProperties.MqttProperty contentTypeProperty = msg.variableHeader().properties()
.getProperty(MqttPropertyType.CONTENT_TYPE.value());
return contentTypeProperty != null;
}

private static boolean validateContentTypeAsUTF8(MqttPublishMessage msg) {
MqttProperties.StringProperty contentTypeProperty = (MqttProperties.StringProperty) msg.variableHeader().properties()
.getProperty(MqttPropertyType.CONTENT_TYPE.value());

byte[] rawPayload = contentTypeProperty.value().getBytes();

boolean isValid = true;
try {
// Decoder instance is stateful so shouldn't be invoked concurrently, hence one instance per call.
// Possible optimization is to use one instance per thread.
StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap(rawPayload));
} catch (CharacterCodingException ex) {
isValid = false;
}
return isValid;
}

/**
* notify MqttConnectMessage after connection established (already pass login).
* @param msg
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
*
* Copyright (c) 2012-2024 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*
*/

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator;
import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.codec.mqtt.MqttProperties.BinaryProperty;
import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType;
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.shadow.com.univocity.parsers.common.processor.InputValueSwitch;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static org.junit.jupiter.api.Assertions.*;

public class ContentTypeTest extends AbstractServerIntegrationTest {

// second octet is invalid
public static final byte[] INVALID_UTF_8_BYTES = new byte[]{(byte) 0xC3, 0x28};

@Override
public String clientName() {
return "subscriber";
}

@Test
public void givenAPublishWithContentTypeWhenForwardedToSubscriberThenIsPresent() throws InterruptedException {
Mqtt5BlockingClient subscriber = createSubscriberClient();
subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.send();

Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
.payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8))
.contentType("application/json")
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyPublishMessage(subscriber, msgPub -> {
assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present");
assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched");
});
}

@Test
public void givenAPublishWithContentTypeRetainedWhenForwardedToSubscriberThenIsPresent() throws InterruptedException, MqttException {
Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
.payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8))
.retain(true)
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
.qos(MqttQos.AT_LEAST_ONCE) // retained works for QoS > 0
.send();

MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence());
client.connect();
MqttSubscription subscription = new MqttSubscription("temperature/living", 1);
SubscriptionOptionsTest.PublishCollector publishCollector = new SubscriptionOptionsTest.PublishCollector();
IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription},
new IMqttMessageListener[] {publishCollector});
TestUtils.verifySubscribedSuccessfully(subscribeToken);

// Verify the message is also reflected back to the sender
publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS);
assertEquals("temperature/living", publishCollector.receivedTopic());
assertEquals("{\"max\": 18}", publishCollector.receivedPayload(), "Payload published on topic should match");
org.eclipse.paho.mqttv5.common.MqttMessage receivedMessage = publishCollector.receivedMessage();
assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), receivedMessage.getQos());
assertTrue(receivedMessage.getProperties().getPayloadFormat());
}
}

0 comments on commit 0f2b22e

Please sign in to comment.