Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support consumer subscribe multiple topics #2729

Open
wants to merge 4 commits into
base: 2.2.x-ospp2023
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/**
* @author <a href="mailto:[email protected]">Jim</a>
Expand All @@ -62,13 +63,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport

private DefaultMQPushConsumer pushConsumer;

private final String topic;
private final String destination;

private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;

public RocketMQInboundChannelAdapter(String topic,
public RocketMQInboundChannelAdapter(String destination,
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
this.topic = topic;
this.destination = destination;
this.extendedConsumerProperties = extendedConsumerProperties;
}

Expand Down Expand Up @@ -182,10 +183,32 @@ protected void doStart() {
|| !extendedConsumerProperties.getExtension().getEnabled()) {
return;
}
Instrumentation instrumentation = new Instrumentation(topic, this);
Instrumentation instrumentation = new Instrumentation(destination, this);
try {
pushConsumer.subscribe(topic, RocketMQUtils.getMessageSelector(
extendedConsumerProperties.getExtension().getSubscription()));
if (extendedConsumerProperties.isMultiplex()) {
String[] topics = StringUtils.commaDelimitedListToStringArray(destination);
String subscription = extendedConsumerProperties.getExtension().getSubscription();
if (StringUtils.isEmpty(subscription)) {
for (String topic : topics) {
pushConsumer.subscribe(topic, "*");
}
} else {
if (subscription.contains(RocketMQUtils.SQL)) {
throw new MessagingException("Multiplex scenario doesn't support SQL92 Filtering for now, please use Tag Filtering.");
}
String[] subscriptions = StringUtils.commaDelimitedListToStringArray(subscription);
if (subscriptions.length != topics.length) {
throw new MessagingException("Length of subscriptions should be the same as the length of topics.");
}
for (int i = 0; i < topics.length; i++) {
pushConsumer.subscribe(topics[i], subscriptions[i]);
}
}
} else {
pushConsumer.subscribe(destination, RocketMQUtils.getMessageSelector(
extendedConsumerProperties.getExtension().getSubscription()));
}

pushConsumer.start();
instrumentation.markStartedSuccessfully();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/**
* @author <a href="mailto:[email protected]">Jim</a>
Expand All @@ -61,7 +63,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>

private volatile boolean running;

private final String topic;
private final String name;

private final MessageSelector messageSelector;

Expand All @@ -71,7 +73,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>

public RocketMQMessageSource(String name,
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
this.topic = name;
this.name = name;
this.messageSelector = RocketMQUtils.getMessageSelector(
extendedConsumerProperties.getExtension().getSubscription());
this.extendedConsumerProperties = extendedConsumerProperties;
Expand All @@ -80,24 +82,47 @@ public RocketMQMessageSource(String name,

@Override
public synchronized void start() {
Instrumentation instrumentation = new Instrumentation(topic, this);
Instrumentation instrumentation = new Instrumentation(name, this);
try {
if (this.isRunning()) {
throw new IllegalStateException(
"pull consumer already running. " + this.toString());
}
this.consumer = RocketMQConsumerFactory
.initPullConsumer(topic, extendedConsumerProperties);
.initPullConsumer(name, extendedConsumerProperties);
// This parameter must be 1, otherwise doReceive cannot be handled singly.
// this.consumer.setPullBatchSize(1);
this.consumer.subscribe(topic, messageSelector);
String subscription = extendedConsumerProperties.getExtension().getSubscription();
if (extendedConsumerProperties.isMultiplex()) {
String[] topics = StringUtils.commaDelimitedListToStringArray(name);
if (StringUtils.isEmpty(subscription)) {
for (String topic : topics) {
consumer.subscribe(topic, "*");
}
} else {
if (subscription.contains(RocketMQUtils.SQL)) {
throw new MessagingException("Multiplex scenario doesn't support SQL92 Filtering for now, please use Tag Filtering.");
}
String[] subscriptions = StringUtils.commaDelimitedListToStringArray(subscription);
if (subscriptions.length != topics.length) {
throw new MessagingException("Length of subscriptions should be the same as the length of topics.");
}
for (int i = 0; i < topics.length; i++) {
consumer.subscribe(topics[i], subscriptions[i]);
}
}
// Initialize messageQueuesForTopic immediately
for (String topic: topics) {
messageQueuesForTopic.put(topic, consumer.fetchMessageQueues(topic));
}
} else {
consumer.subscribe(name, RocketMQUtils.getMessageSelector(subscription));
messageQueuesForTopic.put(name, consumer.fetchMessageQueues(name));
}

this.consumer.setAutoCommit(false);
// register TopicMessageQueueChangeListener for messageQueuesForTopic
consumer.registerTopicMessageQueueChangeListener(topic,
messageQueuesForTopic::put);
this.consumer.start();
// Initialize messageQueuesForTopic immediately
messageQueuesForTopic.put(topic, consumer.fetchMessageQueues(topic));

instrumentation.markStartedSuccessfully();
}
catch (MQClientException e) {
Expand Down Expand Up @@ -128,7 +153,9 @@ private MessageQueue acquireCurrentMessageQueue(String topic, int queueId,
@Override
public synchronized void stop() {
if (this.isRunning() && null != consumer) {
consumer.unsubscribe(topic);
for (String topic: StringUtils.commaDelimitedListToStringArray(name)) {
consumer.unsubscribe(topic);
}
consumer.shutdown();
this.running = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.util.StringUtils;

/**
* @author Timur Valiev
Expand All @@ -39,21 +40,24 @@ public class RocketMQTopicProvisioner implements
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<RocketMQProducerProperties> properties)
throws ProvisioningException {
checkTopic(name);
checkDestination(name);
return new RocketProducerDestination(name);
}

@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties> properties)
throws ProvisioningException {
checkTopic(name);
checkDestination(name);
return new RocketConsumerDestination(name);
}

private void checkTopic(String topic) {
private void checkDestination(String destination) {
String[] topics = StringUtils.commaDelimitedListToStringArray(destination);
try {
Validators.checkTopic(topic);
for (String topic: topics) {
Validators.checkTopic(topic);
}
}
catch (MQClientException e) {
throw new AssertionError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ public static String getNameServerStr(String nameServer) {
return nameServer.replaceAll(",", ";");
}

private static final String SQL = "sql:";
/**
* the prefix of subscription when using SQL92 expression.
*/
public static final String SQL = "sql:";

public static MessageSelector getMessageSelector(String expression) {
if (StringUtils.hasText(expression) && expression.startsWith(SQL)) {
Expand Down