diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index a71d67961..6143c1a47 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -254,6 +254,7 @@ private List recursiveMatch(Topic topicName, INode inode, int dept // type #, + or exact match Optional subInode = cnode.childOf(Token.MULTI); if (subInode.isPresent()) { + Topic remainingRealTopic = (ROOT.equals(cnode.getToken())) ? topicName : topicName.exceptFullHeadToken(); subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); } subInode = cnode.childOf(Token.SINGLE); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/Token.java b/broker/src/main/java/io/moquette/broker/subscriptions/Token.java index ca3544ed1..64bc8cf27 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/Token.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/Token.java @@ -25,15 +25,29 @@ public class Token implements Comparable { static final Token MULTI = new Token("#"); static final Token SINGLE = new Token("+"); final String name; + boolean lastSubToken; protected Token(String s) { + this(s, true); + } + + protected Token(String s, boolean isLastSub) { name = s; + lastSubToken = isLastSub; } protected String name() { return name; } + protected void setLastSubToken(boolean lastSubToken) { + this.lastSubToken = lastSubToken; + } + + protected boolean isLastSubToken() { + return lastSubToken; + } + @Override public int hashCode() { int hash = 7; diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java b/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java index 96940e116..3778f6571 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java @@ -31,6 +31,8 @@ public class Topic implements Serializable, Comparable { private static final Logger LOG = LoggerFactory.getLogger(Topic.class); + public static int MAX_TOKEN_LENGTH = 4; + private static final long serialVersionUID = 2438799283749822L; private final String topic; @@ -55,7 +57,7 @@ public Topic(String topic) { Topic(List tokens) { this.tokens = tokens; - List strTokens = tokens.stream().map(Token::toString).collect(Collectors.toList()); + List strTokens = fullTokens().stream().map(Token::toString).collect(Collectors.toList()); this.topic = String.join("/", strTokens); this.valid = true; } @@ -74,7 +76,24 @@ public List getTokens() { return tokens; } - private List parseTopic(String topic) throws ParseException { + public List fullTokens() { + List fullTokens = new ArrayList<>(); + String currentToken = null; + for (Token token : getTokens()) { + if (currentToken == null) { + currentToken = token.name; + } else { + currentToken += token.name; + } + if (token.isLastSubToken()) { + fullTokens.add(new Token(currentToken, true)); + currentToken = null; + } + } + return fullTokens; + } + + private static List parseTopic(String topic) throws ParseException { if (topic.length() == 0) { throw new ParseException("Bad format of topic, topic MUST be at least 1 character [MQTT-4.7.3-1] and " + "this was empty", 0); @@ -117,7 +136,18 @@ private List parseTopic(String topic) throws ParseException { } else if (s.contains("+")) { throw new ParseException("Bad format of topic, invalid subtopic name: " + s, i); } else { - res.add(new Token(s)); + final int l = s.length(); + int start = 0; + Token token = null; + while (start < l) { + int end = Math.min(start + MAX_TOKEN_LENGTH, l); + final String subToken = s.substring(start, end); + token = new Token(subToken, false); + res.add(token); + start = end; + } + // Can't be null because s can't be empty. + token.setLastSubToken(true); } } @@ -151,6 +181,22 @@ public Topic exceptHeadToken() { return new Topic(tokensCopy); } + /** + * @return a new Topic corresponding to this less than the full head token, skipping any sub-tokens. + */ + public Topic exceptFullHeadToken() { + List tokens = getTokens(); + if (tokens.isEmpty()) { + return new Topic(Collections.emptyList()); + } + List tokensCopy = new ArrayList<>(tokens); + Token removed; + do { + removed = tokensCopy.remove(0); + } while (!removed.isLastSubToken() && !tokensCopy.isEmpty()); + return new Topic(tokensCopy); + } + public boolean isValid() { if (tokens == null) getTokens(); @@ -169,14 +215,16 @@ public boolean isValid() { public boolean match(Topic subscriptionTopic) { List msgTokens = getTokens(); List subscriptionTokens = subscriptionTopic.getTokens(); + // Due to sub-tokens and the + wildcard, indexes may differ. int i = 0; - for (; i < subscriptionTokens.size(); i++) { + int m = 0; + for (; i < subscriptionTokens.size(); i++, m++) { Token subToken = subscriptionTokens.get(i); if (!Token.MULTI.equals(subToken) && !Token.SINGLE.equals(subToken)) { - if (i >= msgTokens.size()) { + if (m >= msgTokens.size()) { return false; } - Token msgToken = msgTokens.get(i); + Token msgToken = msgTokens.get(m); if (!msgToken.equals(subToken)) { return false; } @@ -184,12 +232,20 @@ public boolean match(Topic subscriptionTopic) { if (Token.MULTI.equals(subToken)) { return true; } -// if (Token.SINGLE.equals(subToken)) { -// // skip a step forward -// } + if (m >= msgTokens.size()) { + return false; + } + if (Token.SINGLE.equals(subToken)) { + // skip to the next full token in the message topic + Token msgToken = msgTokens.get(m); + while (!msgToken.isLastSubToken()) { + m++; + msgToken = msgTokens.get(m); + } + } } } - return i == msgTokens.size(); + return m == msgTokens.size(); } @Override diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java index 60660cbfd..98e0de858 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java @@ -44,21 +44,34 @@ static SubscriptionRequest clientSubOnTopic(String clientID, String topicName) { @Test @Timeout(value = MAX_DURATION_S) public void testManyClientsFewTopics() { - List subscriptionList = prepareSubscriptionsManyClientsFewTopic(); + + List subscriptionList = prepareSubscriptionsManyClientsFewTopic(50_000); createSubscriptions(subscriptionList); } @Test @Timeout(value = MAX_DURATION_S) public void testFlat() { - List results = prepareSubscriptionsFlat(); - createSubscriptions(results); + Topic.MAX_TOKEN_LENGTH = 1; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 2; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 3; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 4; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 5; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 6; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 7; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); } @Test @Timeout(value = MAX_DURATION_S) public void testDeep() { - List results = prepareSubscriptionsDeep(); + List results = prepareSubscriptionsDeep(TOTAL_SUBSCRIPTIONS); createSubscriptions(results); } @@ -83,38 +96,45 @@ public void createSubscriptions(List results) { } long end = System.currentTimeMillis(); long duration = end - start; - LOGGER.info("Added " + count + " subscriptions in " + duration + " ms (" + Math.round(1000.0 * count / duration) + "/s)"); + final long speed = Math.round(1000.0 * count / duration); + LOGGER.info("{}: Added {} subscriptions in {} ms ({}/s)", Topic.MAX_TOKEN_LENGTH, count, duration, speed); } - public List prepareSubscriptionsManyClientsFewTopic() { - List subscriptionList = new ArrayList<>(TOTAL_SUBSCRIPTIONS); - for (int i = 0; i < TOTAL_SUBSCRIPTIONS; i++) { - Topic topic = asTopic("topic/test/" + new Random().nextInt(1 + i % 10) + "/test"); + public List prepareSubscriptionsManyClientsFewTopic(int subCount) { + List subscriptionList = new ArrayList<>(subCount); + long start = System.currentTimeMillis(); + for (int i = 0; i < subCount; i++) { + Topic topic = asTopic("topic/test/" + new Random().nextInt(10) + "/test"); subscriptionList.add(SubscriptionRequest.buildNonShared("TestClient-" + i, topic, MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_LEAST_ONCE))); } + long end = System.currentTimeMillis(); + long duration = end - start; + LOGGER.debug("Prepared {} subscriptions in {} ms on 10 topics", subCount, duration); return subscriptionList; } - public List prepareSubscriptionsFlat() { - List results = new ArrayList<>(TOTAL_SUBSCRIPTIONS); + public List prepareSubscriptionsFlat(int subCount) { + List results = new ArrayList<>(subCount); int count = 0; long start = System.currentTimeMillis(); - for (int topicNr = 0; topicNr < TOTAL_SUBSCRIPTIONS / 10; topicNr++) { - for (int clientNr = 0; clientNr < 10; clientNr++) { + final int clientCount = 1; + final int topicCount = subCount / clientCount; + for (int clientNr = 0; clientNr < clientCount; clientNr++) { + for (int topicNr = 0; topicNr < topicCount; topicNr++) { count++; - results.add(clientSubOnTopic("Client-" + clientNr, "mainTopic-" + topicNr)); + results.add(clientSubOnTopic("Client-" + clientNr, topicNr + "-mainTopic")); } } long end = System.currentTimeMillis(); long duration = end - start; - LOGGER.info("Prepared {} subscriptions in {} ms", count, duration); + LOGGER.debug("Prepared {} subscriptions for {} topics in {} ms", count, topicCount, duration); return results; } - public List prepareSubscriptionsDeep() { - List results = new ArrayList<>(TOTAL_SUBSCRIPTIONS); - long countPerLevel = Math.round(Math.pow(TOTAL_SUBSCRIPTIONS, 0.25)); - LOGGER.info("Preparing {} subscriptions, 4 deep with {} per level", TOTAL_SUBSCRIPTIONS, countPerLevel); + public List prepareSubscriptionsDeep(int subCount) { + List results = new ArrayList<>(subCount); + long countPerLevel = Math.round(Math.pow(subCount, 0.25)); + LOGGER.info("Preparing {} subscriptions, 4 deep with {} per level", subCount, countPerLevel); int count = 0; long start = System.currentTimeMillis(); outerloop: @@ -125,7 +145,7 @@ public List prepareSubscriptionsDeep() { count++; results.add(clientSubOnTopic("Client-" + clientNr, "mainTopic-" + firstLevelNr + "/subTopic-" + secondLevelNr + "/subSubTopic" + thirdLevelNr)); // Due to the 4th-power-root we don't get exactly the required number of subs. - if (count >= TOTAL_SUBSCRIPTIONS) { + if (count >= subCount) { break outerloop; } } diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java index 134213f6a..49ee4d565 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java @@ -145,7 +145,7 @@ public TopicAssert doesNotMatch(String topic) { } public TopicAssert containsToken(Object... tokens) { - Assertions.assertThat(actual.getTokens()).containsExactly(asArray(tokens)); + Assertions.assertThat(actual.fullTokens()).containsExactly(asArray(tokens)); return myself; }