Skip to content

Commit

Permalink
Topic Aliasing Support (#722)
Browse files Browse the repository at this point in the history
Co-authored-by: Bret Ambrose <[email protected]>
  • Loading branch information
bretambrose and Bret Ambrose authored Nov 20, 2023
1 parent efdde24 commit 2c1b66a
Show file tree
Hide file tree
Showing 10 changed files with 554 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class Mqtt5ClientOptions {
private LifecycleEvents lifecycleEvents;
private Consumer<Mqtt5WebsocketHandshakeTransformArgs> websocketHandshakeTransform;
private PublishEvents publishEvents;
private TopicAliasingOptions topicAliasingOptions;

/**
* Returns the host name of the MQTT server to connect to.
Expand Down Expand Up @@ -257,6 +258,15 @@ public PublishEvents getPublishEvents() {
return this.publishEvents;
}

/**
* Returns the topic aliasing options to be used by the client
*
* @return the topic aliasing options to be used by the client
*/
public TopicAliasingOptions getTopicAliasingOptions() {
return this.topicAliasingOptions;
}

/**
* Creates a Mqtt5ClientOptionsBuilder instance
* @param builder The builder to get the Mqtt5ClientOptions values from
Expand All @@ -282,6 +292,7 @@ public Mqtt5ClientOptions(Mqtt5ClientOptionsBuilder builder) {
this.lifecycleEvents = builder.lifecycleEvents;
this.websocketHandshakeTransform = builder.websocketHandshakeTransform;
this.publishEvents = builder.publishEvents;
this.topicAliasingOptions = builder.topicAliasingOptions;
}

/*******************************************************************************
Expand Down Expand Up @@ -579,6 +590,7 @@ static final public class Mqtt5ClientOptionsBuilder {
private LifecycleEvents lifecycleEvents;
private Consumer<Mqtt5WebsocketHandshakeTransformArgs> websocketHandshakeTransform;
private PublishEvents publishEvents;
private TopicAliasingOptions topicAliasingOptions;

/**
* Sets the host name of the MQTT server to connect to.
Expand Down Expand Up @@ -835,6 +847,17 @@ public Mqtt5ClientOptionsBuilder withPublishEvents(PublishEvents publishEvents)
return this;
}

/**
* Sets the topic aliasing options for clients constructed from this builder
*
* @param options topic aliasing options that the client should use
* @return The Mqtt5ClientOptionsBuilder object
*/
public Mqtt5ClientOptionsBuilder withTopicAliasingOptions(TopicAliasingOptions options) {
this.topicAliasingOptions = options;
return this;
}

/**
* Creates a new Mqtt5ClientOptionsBuilder instance
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class NegotiatedSettings {
private long sessionExpiryInterval;
private int receiveMaximumFromServer;
private long maximumPacketSizeToServer;
private int topicAliasMaximumToServer;
private int topicAliasMaximumToClient;
private int serverKeepAlive;
private boolean retainAvailable;
private boolean wildcardSubscriptionsAvailable;
Expand Down Expand Up @@ -58,6 +60,21 @@ public long getMaximumPacketSizeToServer() {
return this.maximumPacketSizeToServer;
}

/**
* @return returns the maximum allowed topic alias value on publishes sent from client to server
*/
public int getTopicAliasMaximumToServer() {
return this.topicAliasMaximumToServer;
}

/**
* @return returns the maximum allowed topic alias value on publishes sent from server to client
*/
public int getTopicAliasMaximumToClient() {
return this.topicAliasMaximumToClient;
}


/**
* Returns the maximum amount of time in seconds between client packets. The client should use PINGREQs to ensure this
* limit is not breached. The server will disconnect the client for inactivity if no MQTT packet is received
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package software.amazon.awssdk.crt.mqtt5;

import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Configuration for all client topic aliasing behavior.
*/
public class TopicAliasingOptions {

private OutboundTopicAliasBehaviorType outboundBehavior;
private Integer outboundCacheMaxSize;
private InboundTopicAliasBehaviorType inboundBehavior;
private Integer inboundCacheMaxSize;

/**
* Default constructor
*/
public TopicAliasingOptions() {
this.outboundBehavior = OutboundTopicAliasBehaviorType.Default;
this.outboundCacheMaxSize = 0;
this.inboundBehavior = InboundTopicAliasBehaviorType.Default;
this.inboundCacheMaxSize = 0;
}

/**
* Controls what kind of outbound topic aliasing behavior the client should attempt to use.
*
* If topic aliasing is not supported by the server, this setting has no effect and any attempts to directly
* manipulate the topic alias id in outbound publishes will be ignored.
*
* By default, outbound topic aliasing is disabled.
*
* @param behavior outbound topic alias behavior to use
*
* @return the topic aliasing options object
*/
public TopicAliasingOptions withOutboundBehavior(OutboundTopicAliasBehaviorType behavior) {
this.outboundBehavior = behavior;
return this;
}

/**
* If outbound topic aliasing is set to LRU, this controls the maximum size of the cache. If outbound topic
* aliasing is set to LRU and this is zero or undefined, a sensible default is used (25). If outbound topic
* aliasing is not set to LRU, then this setting has no effect.
*
* The final size of the cache is determined by the minimum of this setting and the value of the
* topic_alias_maximum property of the received CONNACK. If the received CONNACK does not have an explicit
* positive value for that field, outbound topic aliasing is disabled for the duration of that connection.
*
* @param size maximum size to use for the outbound alias cache
*
* @return the topic aliasing options object
*/
public TopicAliasingOptions withOutboundCacheMaxSize(int size) {
this.outboundCacheMaxSize = size;
return this;
}

/**
* Controls whether or not the client allows the broker to use topic aliasing when sending publishes. Even if
* inbound topic aliasing is enabled, it is up to the server to choose whether or not to use it.
*
* If left undefined, then inbound topic aliasing is disabled.
*
* @param behavior inbound topic alias behavior to use
*
* @return the topic aliasing options object
*/
public TopicAliasingOptions withInboundBehavior(InboundTopicAliasBehaviorType behavior) {
this.inboundBehavior = behavior;
return this;
}

/**
* If inbound topic aliasing is enabled, this will control the size of the inbound alias cache. If inbound
* aliases are enabled and this is zero or undefined, then a sensible default will be used (25). If inbound
* aliases are disabled, this setting has no effect.
*
* Behaviorally, this value overrides anything present in the topic_alias_maximum field of
* the CONNECT packet options.
*
* @param size maximum size to use for the inbound alias cache
*
* @return the topic aliasing options object
*/
public TopicAliasingOptions withInboundCacheMaxSize(int size) {
this.inboundCacheMaxSize = size;
return this;
}

/**
* An enumeration that controls how the client applies topic aliasing to outbound publish packets.
*
* Topic alias behavior is described in <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113">MQTT5 Topic Aliasing</a>
*/
public enum OutboundTopicAliasBehaviorType {

/**
* Maps to Disabled. This keeps the client from being broken (by default) if the broker
* topic aliasing implementation has a problem.
*/
Default(0),

/**
* Outbound aliasing is the user's responsibility. Client will cache and use
* previously-established aliases if they fall within the negotiated limits of the connection.
*
* The user must still always submit a full topic in their publishes because disconnections disrupt
* topic alias mappings unpredictably. The client will properly use a requested alias when the most-recently-seen
* binding for a topic alias value matches the alias and topic in the publish packet.
*/
Manual(1),

/**
* (Recommended) The client will ignore any user-specified topic aliasing and instead use an LRU cache to drive
* alias usage.
*/
LRU(2),

/**
* Completely disable outbound topic aliasing.
*/
Disabled(3);

private int value;

private OutboundTopicAliasBehaviorType(int value) {
this.value = value;
}

/**
* @return The native enum integer value associated with this Java enum value
*/
public int getValue() {
return this.value;
}

/**
* Creates a Java OutboundTopicAliasBehaviorType enum value from a native integer value.
*
* @param value native integer value for the OutboundTopicAliasBehaviorType value
* @return a new OutboundTopicAliasBehaviorType value
*/
public static OutboundTopicAliasBehaviorType getEnumValueFromInteger(int value) {
OutboundTopicAliasBehaviorType enumValue = enumMapping.get(value);
if (enumValue != null) {
return enumValue;
}
throw new RuntimeException("Illegal OutboundTopicAliasBehaviorType");
}

private static Map<Integer, OutboundTopicAliasBehaviorType> buildEnumMapping() {
return Stream.of(OutboundTopicAliasBehaviorType.values())
.collect(Collectors.toMap(OutboundTopicAliasBehaviorType::getValue, Function.identity()));
}

private static Map<Integer, OutboundTopicAliasBehaviorType> enumMapping = buildEnumMapping();
}

/**
* An enumeration that controls whether or not the client allows the broker to send publishes that use topic
* aliasing.
*
* Topic alias behavior is described in https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113
*/
public enum InboundTopicAliasBehaviorType {

/**
* Maps to Disabled. This keeps the client from being broken (by default) if the broker
* topic aliasing implementation has a problem.
*/
Default(0),

/**
* Allow the server to send PUBLISH packets to the client that use topic aliasing
*/
Enabled(1),

/**
* Forbid the server from sending PUBLISH packets to the client that use topic aliasing
*/
Disabled(2);

private int value;

private InboundTopicAliasBehaviorType(int value) {
this.value = value;
}

/**
* @return The native enum integer value associated with this Java enum value
*/
public int getValue() {
return this.value;
}

/**
* Creates a Java InboundTopicAliasBehaviorType enum value from a native integer value.
*
* @param value native integer value for the InboundTopicAliasBehaviorType value
* @return a new InboundTopicAliasBehaviorType value
*/
public static InboundTopicAliasBehaviorType getEnumValueFromInteger(int value) {
InboundTopicAliasBehaviorType enumValue = enumMapping.get(value);
if (enumValue != null) {
return enumValue;
}
throw new RuntimeException("Illegal InboundTopicAliasBehaviorType");
}

private static Map<Integer, InboundTopicAliasBehaviorType> buildEnumMapping() {
return Stream.of(InboundTopicAliasBehaviorType.values())
.collect(Collectors.toMap(InboundTopicAliasBehaviorType::getValue, Function.identity()));
}

private static Map<Integer, InboundTopicAliasBehaviorType> enumMapping = buildEnumMapping();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ConnAckPacket {
private Boolean retainAvailable;
private Long maximumPacketSize;
private String assignedClientIdentifier;
private Integer topicAliasMaximum;
private String reasonString;

private List<UserProperty> userProperties;
Expand Down Expand Up @@ -132,6 +133,17 @@ public String getAssignedClientIdentifier() {
return this.assignedClientIdentifier;
}

/**
* Returns the maximum topic alias value that the server will accept from the client.
*
* See <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901088">MQTT5 Topic Alias Maximum</a>
*
* @return maximum allowed topic alias value
*/
public Integer getTopicAliasMaximum() {
return this.topicAliasMaximum;
}

/**
* Returns additional diagnostic information about the result of the connection attempt.
*
Expand Down
Loading

0 comments on commit 2c1b66a

Please sign in to comment.