diff --git a/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java b/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java index 63f5b8d2..815f8225 100644 --- a/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java +++ b/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java @@ -118,7 +118,7 @@ public ProducerDestination provisionProducerDestination(String name, } } } - return new RabbitProducerDestination(exchange, binding); + return new RabbitProducerDestination(exchange, binding,producerProperties); } @Override @@ -165,7 +165,7 @@ public ConsumerDestination provisionConsumerDestination(String name, String grou autoBindDLQ(applyPrefix(properties.getExtension().getPrefix(), baseQueueName), queueName, properties.getExtension()); } - return new RabbitConsumerDestination(queue, binding); + return new RabbitConsumerDestination(queue, binding, properties); } /** @@ -462,16 +462,19 @@ private void removeSingleton(String name) { } } - private static final class RabbitProducerDestination implements ProducerDestination { + private static final class RabbitProducerDestination implements ProducerDestination> { private final Exchange exchange; private final Binding binding; - RabbitProducerDestination(Exchange exchange, Binding binding) { + private final ExtendedProducerProperties properties; + + RabbitProducerDestination(Exchange exchange, Binding binding, ExtendedProducerProperties properties) { Assert.notNull(exchange, "exchange must not be null"); this.exchange = exchange; this.binding = binding; + this.properties = properties; } @Override @@ -479,6 +482,12 @@ public String getName() { return this.exchange.getName(); } + @Override + public ExtendedProducerProperties getProperties() { + return this.properties; + } + + @Override public String getNameForPartition(int partition) { return this.exchange.getName(); @@ -493,15 +502,17 @@ public String toString() { } } - private static final class RabbitConsumerDestination implements ConsumerDestination { + private static final class RabbitConsumerDestination implements ConsumerDestination> { private final Queue queue; private final Binding binding; + private final ExtendedConsumerProperties properties; - RabbitConsumerDestination(Queue queue, Binding binding) { + RabbitConsumerDestination(Queue queue, Binding binding, ExtendedConsumerProperties properties) { Assert.notNull(queue, "queue must not be null"); this.queue = queue; this.binding = binding; + this.properties = properties; } @Override @@ -516,6 +527,12 @@ public String toString() { public String getName() { return this.queue.getName(); } + + @Override + public ExtendedConsumerProperties getProperties() { + return this.properties; + } + } } diff --git a/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java b/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java index 3a2e3fe8..d8763c89 100644 --- a/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java @@ -16,15 +16,10 @@ package org.springframework.cloud.stream.binder.rabbit; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; -import org.springframework.amqp.AmqpRejectAndDontRequeueException; -import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.ConnectionFactory; @@ -34,9 +29,6 @@ import org.springframework.amqp.rabbit.core.support.BatchingStrategy; import org.springframework.amqp.rabbit.core.support.SimpleBatchingStrategy; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; -import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; -import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; -import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; import org.springframework.amqp.rabbit.support.MessagePropertiesConverter; import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor; @@ -49,25 +41,21 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; -import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties; import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties; import org.springframework.cloud.stream.binder.rabbit.properties.RabbitExtendedBindingProperties; import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties; import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner; +import org.springframework.cloud.stream.error.BinderErrorConfigurer; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter; import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint; -import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessageProducer; -import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessagingException; -import org.springframework.messaging.support.ErrorMessage; import org.springframework.retry.RetryPolicy; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; @@ -88,14 +76,14 @@ * @author Ilayaperumal Gopinathan * @author David Turanski * @author Marius Bogoevici + * @author Vinicius Carvalho */ public class RabbitMessageChannelBinder extends AbstractMessageChannelBinder, ExtendedProducerProperties, RabbitExchangeQueueProvisioner> implements ExtendedPropertiesBinder { - private static final AmqpMessageHeaderErrorMessageStrategy errorMessageStrategy = - new AmqpMessageHeaderErrorMessageStrategy(); + private static final MessagePropertiesConverter inboundMessagePropertiesConverter = new DefaultMessagePropertiesConverter() { @@ -126,8 +114,9 @@ public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelo private RabbitExtendedBindingProperties extendedBindingProperties = new RabbitExtendedBindingProperties(); public RabbitMessageChannelBinder(ConnectionFactory connectionFactory, RabbitProperties rabbitProperties, - RabbitExchangeQueueProvisioner provisioningProvider) { - super(true, new String[0], provisioningProvider); + RabbitExchangeQueueProvisioner provisioningProvider, + BinderErrorConfigurer errorConfigurer) { + super(true, new String[0], provisioningProvider, errorConfigurer); Assert.notNull(connectionFactory, "connectionFactory must not be null"); Assert.notNull(rabbitProperties, "rabbitProperties must not be null"); this.connectionFactory = connectionFactory; @@ -272,117 +261,12 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDes DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper(); mapper.setRequestHeaderNames(properties.getExtension().getHeaderPatterns()); adapter.setHeaderMapper(mapper); - ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(consumerDestination, group, properties); - if (properties.getMaxAttempts() > 1) { - adapter.setRetryTemplate(buildRetryTemplate(properties)); - if (properties.getExtension().isRepublishToDlq()) { - adapter.setRecoveryCallback(errorInfrastructure.getRecoverer()); - } - } - else { - adapter.setErrorMessageStrategy(errorMessageStrategy); - adapter.setErrorChannel(errorInfrastructure.getErrorChannel()); - } - return adapter; - } - @Override - protected ErrorMessageStrategy getErrorMessageStrategy() { - return errorMessageStrategy; - } - @Override - protected MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group, - final ExtendedConsumerProperties properties) { - if (properties.getExtension().isRepublishToDlq()) { - return new MessageHandler() { - - private final RabbitTemplate template = new RabbitTemplate( - RabbitMessageChannelBinder.this.connectionFactory); - - private final String exchange = deadLetterExchangeName(properties.getExtension()); - - private final String routingKey = properties.getExtension().getDeadLetterRoutingKey(); - - @Override - public void handleMessage(org.springframework.messaging.Message message) throws MessagingException { - Message amqpMessage = (Message) message.getHeaders() - .get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE); - if (!(message instanceof ErrorMessage)) { - logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " - + message); - } - else if (amqpMessage == null) { - logger.error("No raw message header in " + message); - } - else { - Throwable cause = (Throwable) message.getPayload(); - MessageProperties messageProperties = amqpMessage.getMessageProperties(); - Map headers = messageProperties.getHeaders(); - headers.put(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause)); - headers.put(RepublishMessageRecoverer.X_EXCEPTION_MESSAGE, - cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage()); - headers.put(RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE, - messageProperties.getReceivedExchange()); - headers.put(RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY, - messageProperties.getReceivedRoutingKey()); - if (properties.getExtension().getRepublishDeliveyMode() != null) { - messageProperties.setDeliveryMode(properties.getExtension().getRepublishDeliveyMode()); - } - template.send(this.exchange, - this.routingKey != null ? this.routingKey : messageProperties.getConsumerQueue(), - amqpMessage); - } - } - - }; - } - else if (properties.getMaxAttempts() > 1) { - return new MessageHandler() { - - private final RejectAndDontRequeueRecoverer recoverer = new RejectAndDontRequeueRecoverer(); - - @Override - public void handleMessage(org.springframework.messaging.Message message) throws MessagingException { - Message amqpMessage = (Message) message.getHeaders() - .get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE); - if (!(message instanceof ErrorMessage)) { - logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " - + message); - throw new ListenerExecutionFailedException("Unexpected error message " + message, - new AmqpRejectAndDontRequeueException(""), null); - } - else if (amqpMessage == null) { - logger.error("No raw message header in " + message); - throw new ListenerExecutionFailedException("Unexpected error message " + message, - new AmqpRejectAndDontRequeueException(""), amqpMessage); - } - else { - this.recoverer.recover(amqpMessage, (Throwable) message.getPayload()); - } - } - - }; - } - else { - return super.getErrorMessageHandler(destination, group, properties); - } + return adapter; } - @Override - protected String errorsBaseName(ConsumerDestination destination, String group, - ExtendedConsumerProperties consumerProperties) { - return destination.getName() + ".errors"; - } - private String deadLetterExchangeName(RabbitCommonProperties properties) { - if (properties.getDeadLetterExchange() == null) { - return applyPrefix(properties.getPrefix(), RabbitCommonProperties.DEAD_LETTER_EXCHANGE); - } - else { - return properties.getDeadLetterExchange(); - } - } @Override protected void afterUnbindConsumer(ConsumerDestination consumerDestination, String group, @@ -430,13 +314,11 @@ private RabbitTemplate buildRabbitTemplate(RabbitProducerProperties properties) } rabbitTemplate.afterPropertiesSet(); return rabbitTemplate; + } - private String getStackTraceAsString(Throwable cause) { - StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter, true); - cause.printStackTrace(printWriter); - return stringWriter.getBuffer().toString(); + RabbitMessageChannelErrorConfigurer getErrorConfigurer(){ + return (RabbitMessageChannelErrorConfigurer) this.errorConfigurer; } } diff --git a/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelErrorConfigurer.java b/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelErrorConfigurer.java new file mode 100644 index 00000000..c3741bce --- /dev/null +++ b/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelErrorConfigurer.java @@ -0,0 +1,164 @@ +package org.springframework.cloud.stream.binder.rabbit; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Map; + + +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; +import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; +import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; +import org.springframework.cloud.stream.binder.Binding; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.MessageProducerBinding; +import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties; +import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties; +import org.springframework.cloud.stream.error.AbstractMessageChannelErrorConfigurer; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter; +import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy; +import org.springframework.integration.support.ErrorMessageStrategy; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.ErrorMessage; + +/** + * @author Vinicius Carvalho + * @author Gary Russel + */ +public class RabbitMessageChannelErrorConfigurer extends AbstractMessageChannelErrorConfigurer> { + + private static final AmqpMessageHeaderErrorMessageStrategy errorMessageStrategy = + new AmqpMessageHeaderErrorMessageStrategy(); + + public RabbitMessageChannelErrorConfigurer(ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + private final ConnectionFactory connectionFactory; + + + + + @Override + protected MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group, final ExtendedConsumerProperties properties) { + if (properties.getExtension().isRepublishToDlq()) { + return new MessageHandler() { + + private final RabbitTemplate template = new RabbitTemplate( + RabbitMessageChannelErrorConfigurer.this.connectionFactory); + + private final String exchange = deadLetterExchangeName(properties.getExtension()); + + private final String routingKey = properties.getExtension().getDeadLetterRoutingKey(); + + @Override + public void handleMessage(org.springframework.messaging.Message message) throws MessagingException { + Message amqpMessage = (Message) message.getHeaders() + .get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE); + if (!(message instanceof ErrorMessage)) { + logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + + message); + } + else if (amqpMessage == null) { + logger.error("No raw message header in " + message); + } + else { + Throwable cause = (Throwable) message.getPayload(); + MessageProperties messageProperties = amqpMessage.getMessageProperties(); + Map headers = messageProperties.getHeaders(); + headers.put(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause)); + headers.put(RepublishMessageRecoverer.X_EXCEPTION_MESSAGE, + cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage()); + headers.put(RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE, + messageProperties.getReceivedExchange()); + headers.put(RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY, + messageProperties.getReceivedRoutingKey()); + if (properties.getExtension().getRepublishDeliveyMode() != null) { + messageProperties.setDeliveryMode(properties.getExtension().getRepublishDeliveyMode()); + } + template.send(this.exchange, + this.routingKey != null ? this.routingKey : messageProperties.getConsumerQueue(), + amqpMessage); + } + } + + }; + } + else if (properties.getMaxAttempts() > 1) { + return new MessageHandler() { + + private final RejectAndDontRequeueRecoverer recoverer = new RejectAndDontRequeueRecoverer(); + + @Override + public void handleMessage(org.springframework.messaging.Message message) throws MessagingException { + Message amqpMessage = (Message) message.getHeaders() + .get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE); + if (!(message instanceof ErrorMessage)) { + logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + + message); + throw new ListenerExecutionFailedException("Unexpected error message " + message, + new AmqpRejectAndDontRequeueException(""), null); + } + else if (amqpMessage == null) { + logger.error("No raw message header in " + message); + throw new ListenerExecutionFailedException("Unexpected error message " + message, + new AmqpRejectAndDontRequeueException(""), amqpMessage); + } + else { + this.recoverer.recover(amqpMessage, (Throwable) message.getPayload()); + } + } + + }; + } + else { + return super.getErrorMessageHandler(destination, group, properties); + } + } + + private String deadLetterExchangeName(RabbitCommonProperties properties) { + if (properties.getDeadLetterExchange() == null) { + return RabbitMessageChannelBinder.applyPrefix(properties.getPrefix(), RabbitCommonProperties.DEAD_LETTER_EXCHANGE); + } + else { + return properties.getDeadLetterExchange(); + } + } + + @Override + protected void configure(Binding binding, String group) { + MessageProducerBinding consumerBinding = (MessageProducerBinding) binding; + ExtendedConsumerProperties properties = (ExtendedConsumerProperties)consumerBinding.getDestination().getProperties(); + AmqpInboundChannelAdapter adapter = (AmqpInboundChannelAdapter)consumerBinding.getMessageProducer(); + ErrorInfrastructure errorInfrastructure = getErrorInfrastructure(consumerBinding.getDestination().getName()); + if (properties.getMaxAttempts() > 1) { + adapter.setRetryTemplate(buildRetryTemplate(properties)); + if (properties.getExtension().isRepublishToDlq()) { + adapter.setRecoveryCallback(errorInfrastructure.getRecoverer()); + } + } + else { + adapter.setErrorMessageStrategy(getErrorMessageStrategy()); + adapter.setErrorChannel(errorInfrastructure.getErrorChannel()); + } + } + + @Override + protected ErrorMessageStrategy getErrorMessageStrategy() { + return errorMessageStrategy; + } + + private String getStackTraceAsString(Throwable cause) { + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter, true); + cause.printStackTrace(printWriter); + return stringWriter.getBuffer().toString(); + } +} diff --git a/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/RabbitMessageChannelBinderConfiguration.java b/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/RabbitMessageChannelBinderConfiguration.java index b61f9b49..4d05bfce 100644 --- a/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/RabbitMessageChannelBinderConfiguration.java +++ b/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/RabbitMessageChannelBinderConfiguration.java @@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder; +import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelErrorConfigurer; import org.springframework.cloud.stream.binder.rabbit.properties.RabbitBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rabbit.properties.RabbitExtendedBindingProperties; import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner; @@ -39,6 +40,7 @@ * Configuration class for RabbitMQ message channel binder. * * @author David Turanski + * @author Vinicius Carvalho */ @Configuration @@ -64,7 +66,7 @@ public class RabbitMessageChannelBinderConfiguration { @Bean RabbitMessageChannelBinder rabbitMessageChannelBinder() { RabbitMessageChannelBinder binder = new RabbitMessageChannelBinder(rabbitConnectionFactory, rabbitProperties, - provisioningProvider()); + provisioningProvider(),errorConfigurer()); binder.setCodec(codec); binder.setAdminAddresses(rabbitBinderConfigurationProperties.getAdminAddresses()); binder.setCompressingPostProcessor(gZipPostProcessor()); @@ -90,5 +92,10 @@ MessagePostProcessor gZipPostProcessor() { RabbitExchangeQueueProvisioner provisioningProvider() { return new RabbitExchangeQueueProvisioner(rabbitConnectionFactory); } + + @Bean + RabbitMessageChannelErrorConfigurer errorConfigurer(){ + return new RabbitMessageChannelErrorConfigurer(rabbitConnectionFactory); + } } diff --git a/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/RabbitBinderTests.java b/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/RabbitBinderTests.java index fde43909..b41dd64c 100644 --- a/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/RabbitBinderTests.java +++ b/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/RabbitBinderTests.java @@ -139,8 +139,10 @@ public void testSendAndReceiveBad() throws Exception { DirectChannel moduleInputChannel = createBindableChannel("input", new BindingProperties()); Binding producerBinding = binder.bindProducer("bad.0", moduleOutputChannel, createProducerProperties()); + producerBinding.bind(); Binding consumerBinding = binder.bindConsumer("bad.0", "test", moduleInputChannel, createConsumerProperties()); + consumerBinding.bind(); Message message = MessageBuilder.withPayload("bad").setHeader(MessageHeaders.CONTENT_TYPE, "foo/bar") .build(); final CountDownLatch latch = new CountDownLatch(3); @@ -167,6 +169,7 @@ public void testConsumerProperties() throws Exception { properties.getExtension().setExclusive(true); Binding consumerBinding = binder.bindConsumer("props.0", null, createBindableChannel("input", new BindingProperties()), properties); + consumerBinding.bind(); Lifecycle endpoint = extractEndpoint(consumerBinding); SimpleMessageListenerContainer container = TestUtils.getPropertyValue(endpoint, "messageListenerContainer", SimpleMessageListenerContainer.class); @@ -202,7 +205,7 @@ public void testConsumerProperties() throws Exception { properties.setInstanceIndex(0); consumerBinding = binder.bindConsumer("props.0", "test", createBindableChannel("input", new BindingProperties()), properties); - + consumerBinding.bind(); endpoint = extractEndpoint(consumerBinding); container = verifyContainer(endpoint); @@ -228,6 +231,7 @@ public void testConsumerPropertiesWithUserInfrastructureNoBind() throws Exceptio Binding consumerBinding = binder.bindConsumer("propsUser1", "infra", createBindableChannel("input", new BindingProperties()), properties); + consumerBinding.bind(); Lifecycle endpoint = extractEndpoint(consumerBinding); SimpleMessageListenerContainer container = TestUtils.getPropertyValue(endpoint, "messageListenerContainer", SimpleMessageListenerContainer.class); @@ -249,6 +253,7 @@ public void testConsumerPropertiesWithUserInfrastructureCustomExchangeAndRK() th Binding consumerBinding = binder.bindConsumer("propsUser2", "infra", createBindableChannel("input", new BindingProperties()), properties); + consumerBinding.bind(); Lifecycle endpoint = extractEndpoint(consumerBinding); SimpleMessageListenerContainer container = TestUtils.getPropertyValue(endpoint, "messageListenerContainer", SimpleMessageListenerContainer.class); @@ -313,6 +318,7 @@ public void testConsumerPropertiesWithUserInfrastructureCustomQueueArgs() throws Binding consumerBinding = binder.bindConsumer("propsUser3", "infra", createBindableChannel("input", new BindingProperties()), properties); + consumerBinding.bind(); Lifecycle endpoint = extractEndpoint(consumerBinding); SimpleMessageListenerContainer container = TestUtils.getPropertyValue(endpoint, "messageListenerContainer", SimpleMessageListenerContainer.class); @@ -384,6 +390,7 @@ public void testProducerProperties() throws Exception { Binding producerBinding = binder.bindProducer("props.0", createBindableChannel("input", new BindingProperties()), createProducerProperties()); + producerBinding.bind(); Lifecycle endpoint = extractEndpoint(producerBinding); MessageDeliveryMode mode = TestUtils.getPropertyValue(endpoint, "defaultDeliveryMode", MessageDeliveryMode.class); @@ -413,6 +420,7 @@ public void testProducerProperties() throws Exception { DirectChannel channel = createBindableChannel("output", producerBindingProperties); producerBinding = binder.bindProducer("props.0", channel, producerProperties); + producerBinding.bind(); endpoint = extractEndpoint(producerBinding); assertThat(getEndpointRouting(endpoint)) .isEqualTo("'props.0-' + headers['" + BinderHeaders.PARTITION_HEADER + "']"); @@ -456,7 +464,7 @@ public void handleMessage(Message message) throws MessagingException { }); Binding consumerBinding = binder.bindConsumer("durabletest.0", "tgroup", moduleInputChannel, consumerProperties); - + consumerBinding.bind(); RabbitTemplate template = new RabbitTemplate(this.rabbitAvailableRule.getResource()); template.convertAndSend(TEST_PREFIX + "durabletest.0", "", "foo"); @@ -498,7 +506,7 @@ public void handleMessage(Message message) throws MessagingException { }); Binding consumerBinding = binder.bindConsumer("nondurabletest.0", "tgroup", moduleInputChannel, consumerProperties); - + consumerBinding.bind(); consumerBinding.unbind(); assertThat(admin.getQueueProperties(TEST_PREFIX + "nondurabletest.0.dlq")).isNull(); } @@ -524,7 +532,7 @@ public void handleMessage(Message message) throws MessagingException { }); Binding consumerBinding = binder.bindConsumer("dlqtest", "default", moduleInputChannel, consumerProperties); - + consumerBinding.bind(); RabbitTemplate template = new RabbitTemplate(this.rabbitAvailableRule.getResource()); template.convertAndSend("", TEST_PREFIX + "dlqtest.default", "foo"); @@ -561,15 +569,18 @@ public void testAutoBindDLQPartionedConsumerFirst() throws Exception { DirectChannel input0 = createBindableChannel("input", createConsumerBindingProperties(properties)); input0.setBeanName("test.input0DLQ"); Binding input0Binding = binder.bindConsumer("partDLQ.0", "dlqPartGrp", input0, properties); + input0Binding.bind(); Binding defaultConsumerBinding1 = binder.bindConsumer("partDLQ.0", "default", new QueueChannel(), properties); properties.setInstanceIndex(1); + defaultConsumerBinding1.bind(); DirectChannel input1 = createBindableChannel("input1", createConsumerBindingProperties(properties)); input1.setBeanName("test.input1DLQ"); Binding input1Binding = binder.bindConsumer("partDLQ.0", "dlqPartGrp", input1, properties); + input1Binding.bind(); Binding defaultConsumerBinding2 = binder.bindConsumer("partDLQ.0", "default", new QueueChannel(), properties); - + defaultConsumerBinding2.bind(); ExtendedProducerProperties producerProperties = createProducerProperties(); producerProperties.getExtension().setPrefix("bindertest."); producerProperties.getExtension().setAutoBindDlq(true); @@ -580,7 +591,7 @@ public void testAutoBindDLQPartionedConsumerFirst() throws Exception { DirectChannel output = createBindableChannel("output", bindingProperties); output.setBeanName("test.output"); Binding outputBinding = binder.bindProducer("partDLQ.0", output, producerProperties); - + outputBinding.bind(); final CountDownLatch latch0 = new CountDownLatch(1); input0.subscribe(new MessageHandler() { @@ -663,15 +674,18 @@ private void testAutoBindDLQPartionedConsumerFirstWithRepublishGuts(final boolea DirectChannel input0 = createBindableChannel("input", createConsumerBindingProperties(properties)); input0.setBeanName("test.input0DLQ"); Binding input0Binding = binder.bindConsumer("partPubDLQ.0", "dlqPartGrp", input0, properties); + input0Binding.bind(); Binding defaultConsumerBinding1 = binder.bindConsumer("partPubDLQ.0", "default", new QueueChannel(), properties); + defaultConsumerBinding1.bind(); properties.setInstanceIndex(1); DirectChannel input1 = createBindableChannel("input1", createConsumerBindingProperties(properties)); input1.setBeanName("test.input1DLQ"); Binding input1Binding = binder.bindConsumer("partPubDLQ.0", "dlqPartGrp", input1, properties); + input1Binding.bind(); Binding defaultConsumerBinding2 = binder.bindConsumer("partPubDLQ.0", "default", new QueueChannel(), properties); - + defaultConsumerBinding2.bind(); ExtendedProducerProperties producerProperties = createProducerProperties(); producerProperties.getExtension().setPrefix("bindertest."); producerProperties.getExtension().setAutoBindDlq(true); @@ -682,7 +696,7 @@ private void testAutoBindDLQPartionedConsumerFirstWithRepublishGuts(final boolea DirectChannel output = createBindableChannel("output", bindingProperties); output.setBeanName("test.output"); Binding outputBinding = binder.bindProducer("partPubDLQ.0", output, producerProperties); - + outputBinding.bind(); final CountDownLatch latch0 = new CountDownLatch(1); input0.subscribe(new MessageHandler() { @@ -790,7 +804,7 @@ public void testAutoBindDLQPartitionedProducerFirst() throws Exception { DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties)); output.setBeanName("test.output"); Binding outputBinding = binder.bindProducer("partDLQ.1", output, properties); - + outputBinding.bind(); ExtendedConsumerProperties consumerProperties = createConsumerProperties(); consumerProperties.getExtension().setPrefix("bindertest."); consumerProperties.getExtension().setAutoBindDlq(true); @@ -801,16 +815,19 @@ public void testAutoBindDLQPartitionedProducerFirst() throws Exception { input0.setBeanName("test.input0DLQ"); Binding input0Binding = binder.bindConsumer("partDLQ.1", "dlqPartGrp", input0, consumerProperties); + input0Binding.bind(); Binding defaultConsumerBinding1 = binder.bindConsumer("partDLQ.1", "defaultConsumer", new QueueChannel(), consumerProperties); + defaultConsumerBinding1.bind(); consumerProperties.setInstanceIndex(1); DirectChannel input1 = createBindableChannel("input1", createConsumerBindingProperties(consumerProperties)); input1.setBeanName("test.input1DLQ"); Binding input1Binding = binder.bindConsumer("partDLQ.1", "dlqPartGrp", input1, consumerProperties); + input1Binding.bind(); Binding defaultConsumerBinding2 = binder.bindConsumer("partDLQ.1", "defaultConsumer", new QueueChannel(), consumerProperties); - + defaultConsumerBinding2.bind(); final CountDownLatch latch0 = new CountDownLatch(1); input0.subscribe(new MessageHandler() { @@ -900,7 +917,7 @@ public void handleMessage(Message message) throws MessagingException { }); Binding consumerBinding = binder.bindConsumer("foo.dlqpubtest", "foo", moduleInputChannel, consumerProperties); - + consumerBinding.bind(); RabbitTemplate template = new RabbitTemplate(this.rabbitAvailableRule.getResource()); template.convertAndSend("", TEST_PREFIX + "foo.dlqpubtest.foo", "foo"); @@ -935,7 +952,7 @@ public void testBatchingAndCompression() throws Exception { DirectChannel output = createBindableChannel("input", createProducerBindingProperties(producerProperties)); output.setBeanName("batchingProducer"); Binding producerBinding = binder.bindProducer("batching.0", output, producerProperties); - + producerBinding.bind(); Log logger = spy(TestUtils.getPropertyValue(binder, "binder.compressingPostProcessor.logger", Log.class)); new DirectFieldAccessor(TestUtils.getPropertyValue(binder, "binder.compressingPostProcessor")) .setPropertyValue("logger", logger); @@ -959,7 +976,7 @@ public void testBatchingAndCompression() throws Exception { input.setBeanName("batchingConsumer"); Binding consumerBinding = binder.bindConsumer("batching.0", "test", input, createConsumerProperties()); - + consumerBinding.bind(); output.send(new GenericMessage<>("foo".getBytes())); output.send(new GenericMessage<>("bar".getBytes())); @@ -985,7 +1002,7 @@ public void testLateBinding() throws Exception { CachingConnectionFactory cf = new CachingConnectionFactory("localhost", proxy.getPort()); RabbitMessageChannelBinder rabbitBinder = new RabbitMessageChannelBinder(cf, new RabbitProperties(), - new RabbitExchangeQueueProvisioner(cf)); + new RabbitExchangeQueueProvisioner(cf), new RabbitMessageChannelErrorConfigurer(cf)); RabbitTestBinder binder = new RabbitTestBinder(cf, rabbitBinder); ExtendedProducerProperties producerProperties = createProducerProperties(); @@ -994,13 +1011,13 @@ public void testLateBinding() throws Exception { MessageChannel moduleOutputChannel = createBindableChannel("output", createProducerBindingProperties(producerProperties)); Binding late0ProducerBinding = binder.bindProducer("late.0", moduleOutputChannel, producerProperties); - + late0ProducerBinding.bind(); QueueChannel moduleInputChannel = new QueueChannel(); ExtendedConsumerProperties rabbitConsumerProperties = createConsumerProperties(); rabbitConsumerProperties.getExtension().setPrefix("latebinder."); Binding late0ConsumerBinding = binder.bindConsumer("late.0", "test", moduleInputChannel, rabbitConsumerProperties); - + late0ConsumerBinding.bind(); producerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload.equals('0') ? 0 : 1")); producerProperties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()")); producerProperties.setPartitionCount(2); @@ -1008,7 +1025,7 @@ public void testLateBinding() throws Exception { MessageChannel partOutputChannel = createBindableChannel("output", createProducerBindingProperties(producerProperties)); Binding partlate0ProducerBinding = binder.bindProducer("partlate.0", partOutputChannel, producerProperties); - + partlate0ProducerBinding.bind(); QueueChannel partInputChannel0 = new QueueChannel(); QueueChannel partInputChannel1 = new QueueChannel(); @@ -1018,22 +1035,24 @@ public void testLateBinding() throws Exception { partLateConsumerProperties.setInstanceIndex(0); Binding partlate0Consumer0Binding = binder.bindConsumer("partlate.0", "test", partInputChannel0, partLateConsumerProperties); + partlate0Consumer0Binding.bind(); partLateConsumerProperties.setInstanceIndex(1); Binding partlate0Consumer1Binding = binder.bindConsumer("partlate.0", "test", partInputChannel1, partLateConsumerProperties); - + partlate0Consumer1Binding.bind(); ExtendedProducerProperties noDlqProducerProperties = createProducerProperties(); noDlqProducerProperties.getExtension().setPrefix("latebinder."); MessageChannel noDLQOutputChannel = createBindableChannel("output", createProducerBindingProperties(noDlqProducerProperties)); Binding noDlqProducerBinding = binder.bindProducer("lateNoDLQ.0", noDLQOutputChannel, noDlqProducerProperties); - + noDlqProducerBinding.bind(); QueueChannel noDLQInputChannel = new QueueChannel(); ExtendedConsumerProperties noDlqConsumerProperties = createConsumerProperties(); noDlqConsumerProperties.getExtension().setPrefix("latebinder."); Binding noDlqConsumerBinding = binder.bindConsumer("lateNoDLQ.0", "test", noDLQInputChannel, noDlqConsumerProperties); + noDlqConsumerBinding.bind(); MessageChannel outputChannel = createBindableChannel("output", createProducerBindingProperties(noDlqProducerProperties)); Binding pubSubProducerBinding = binder.bindProducer("latePubSub", outputChannel, @@ -1042,11 +1061,12 @@ public void testLateBinding() throws Exception { noDlqConsumerProperties.getExtension().setDurableSubscription(false); Binding nonDurableConsumerBinding = binder.bindConsumer("latePubSub", "lategroup", pubSubInputChannel, noDlqConsumerProperties); + nonDurableConsumerBinding.bind(); QueueChannel durablePubSubInputChannel = new QueueChannel(); noDlqConsumerProperties.getExtension().setDurableSubscription(true); Binding durableConsumerBinding = binder.bindConsumer("latePubSub", "lateDurableGroup", durablePubSubInputChannel, noDlqConsumerProperties); - + durableConsumerBinding.bind(); proxy.start(); moduleOutputChannel.send(new GenericMessage<>("foo")); diff --git a/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/RabbitTestBinder.java b/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/RabbitTestBinder.java index 22af545b..01d6a427 100644 --- a/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/RabbitTestBinder.java +++ b/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/RabbitTestBinder.java @@ -19,6 +19,7 @@ import java.util.HashSet; import java.util.Set; + import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; @@ -56,8 +57,9 @@ public class RabbitTestBinder extends AbstractTestBinder exchanges = new HashSet(); public RabbitTestBinder(ConnectionFactory connectionFactory, RabbitProperties rabbitProperties) { + this(connectionFactory, new RabbitMessageChannelBinder(connectionFactory, rabbitProperties, - new RabbitExchangeQueueProvisioner(connectionFactory))); + new RabbitExchangeQueueProvisioner(connectionFactory), new RabbitMessageChannelErrorConfigurer(connectionFactory))); } public RabbitTestBinder(ConnectionFactory connectionFactory, RabbitMessageChannelBinder binder) { @@ -72,6 +74,7 @@ public RabbitTestBinder(ConnectionFactory connectionFactory, RabbitMessageChanne context.refresh(); binder.setApplicationContext(context); binder.setCodec(new PojoCodec()); + binder.getErrorConfigurer().setApplicationContext(context); this.setBinder(binder); this.rabbitAdmin = new RabbitAdmin(connectionFactory); }