From 726cfcfa6f6d2e1eab1e04e81b9d4d5db7aad1f0 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 14 Feb 2024 16:38:11 -0500 Subject: [PATCH] Rework an observation for Rabbit Binder The observation propagation doesn't work in multi-binder configuration * Remove `ObservationAutoConfiguration` since it is not visible in case of multi-binder configuration * Instead move `setObservationEnabled` flag setting to the `RabbitMessageChannelBinder` * Add `io.micrometer.observation.ObservationRegistry` into `shared.beans` to make it visible for binder-specific application context * Add `RabbitMultiBinderObservationTests` integration test where Rabbit Binder is in multi-binder environment As a side effect this fixes an observation propagation for Kafka binder as well in the multi-binder environment. Its configuration is OK, but an `ObservationRegistry` must make it visible for the binder-specific application context. See the mentioned `shared.beans` Related to https://github.com/spring-cloud/spring-cloud-stream/issues/2901 Also see https://github.com/spring-cloud/spring-cloud-stream/issues/2902 for possible evolution --- .../spring-cloud-stream-binder-rabbit/pom.xml | 32 +++++ .../rabbit/RabbitMessageChannelBinder.java | 17 ++- .../stream/binder/rabbit/StreamUtils.java | 8 +- .../config/ObservationAutoConfiguration.java | 63 ---------- ...ot.autoconfigure.AutoConfiguration.imports | 1 - .../RabbitMultiBinderObservationTests.java | 110 ++++++++++++++++++ .../rabbit-multi-binder-observation.yml | 34 ++++++ .../src/main/resources/META-INF/shared.beans | 1 + 8 files changed, 196 insertions(+), 70 deletions(-) delete mode 100644 binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/ObservationAutoConfiguration.java create mode 100644 binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/integration/RabbitMultiBinderObservationTests.java create mode 100644 binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/resources/rabbit-multi-binder-observation.yml diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml index 06e79d0f42..560edfaa37 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml @@ -99,6 +99,38 @@ 1.17.1 test + + io.micrometer + micrometer-tracing-bridge-brave + test + + + io.micrometer + micrometer-tracing-integration-test + test + + + io.opentelemetry + * + + + com.wavefront + * + + + io.zipkin.reporter2 + * + + + io.micrometer + micrometer-tracing-bridge-otel + + + io.micrometer + micrometer-tracing-reporter-wavefront + + + org.apache.httpcomponents diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java index 57d8955c4f..300536b89d 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java @@ -32,6 +32,7 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Envelope; +import io.micrometer.observation.ObservationRegistry; import jakarta.validation.constraints.NotNull; import org.springframework.amqp.AmqpRejectAndDontRequeueException; @@ -54,6 +55,7 @@ import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.ObservableListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; @@ -87,6 +89,7 @@ import org.springframework.cloud.stream.config.MessageSourceCustomizer; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.expression.Expression; @@ -500,7 +503,7 @@ protected MessageProducer createConsumerEndpoint( "the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively"); String destination = consumerDestination.getName(); RabbitConsumerProperties extension = properties.getExtension(); - MessageListenerContainer listenerContainer = createAndConfigureContainer(consumerDestination, group, + ObservableListenerContainer listenerContainer = createAndConfigureContainer(consumerDestination, group, properties, destination, extension); String[] queues = StringUtils.tokenizeToStringArray(destination, ",", true, true); if (properties.getExtension().getContainerType() != ContainerType.STREAM @@ -509,6 +512,10 @@ protected MessageProducer createConsumerEndpoint( } getContainerCustomizer().configure(listenerContainer, consumerDestination.getName(), group); + // TODO until https://github.com/spring-cloud/spring-cloud-stream/issues/2902 + getApplicationContext().getBeanProvider(ObservationRegistry.class) + .ifAvailable((observationRegistry) -> listenerContainer.setObservationEnabled(true)); + listenerContainer.afterPropertiesSet(); AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer); @@ -540,7 +547,7 @@ protected MessageProducer createConsumerEndpoint( return adapter; } - private MessageListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination, + private ObservableListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination, String group, ExtendedConsumerProperties properties, String destination, RabbitConsumerProperties extension) { @@ -597,6 +604,7 @@ else if (getApplicationContext() != null) { q -> extension.getConsumerTagPrefix() + "#" + index.getAndIncrement()); } + listenerContainer.setApplicationContext(getApplicationContext()); return listenerContainer; } @@ -1048,6 +1056,11 @@ private RabbitTemplate buildRabbitTemplate(RabbitProducerProperties properties, retryTemplate.setBackOffPolicy(backOff); rabbitTemplate.setRetryTemplate(retryTemplate); } + // TODO until https://github.com/spring-cloud/spring-cloud-stream/issues/2902 + AbstractApplicationContext applicationContext = getApplicationContext(); + applicationContext.getBeanProvider(ObservationRegistry.class) + .ifAvailable((observationRegistry) -> rabbitTemplate.setObservationEnabled(true)); + rabbitTemplate.setApplicationContext(applicationContext); rabbitTemplate.afterPropertiesSet(); return rabbitTemplate; } diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java index 8dd501dc59..4c10134a62 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java @@ -22,7 +22,7 @@ import com.rabbitmq.stream.Environment; import org.springframework.amqp.core.MessageProperties; -import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.ObservableListenerContainer; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; @@ -52,6 +52,7 @@ * spring-rabbit-stream. * * @author Gary Russell + * @author Artem Bilan * @since 3.2 * */ @@ -67,11 +68,10 @@ private StreamUtils() { * @param group the group. * @param properties the properties. * @param destination the destination. - * @param extension the properties extension. * @param applicationContext the application context. * @return the container. */ - public static MessageListenerContainer createContainer(ConsumerDestination consumerDestination, String group, + public static ObservableListenerContainer createContainer(ConsumerDestination consumerDestination, String group, ExtendedConsumerProperties properties, String destination, ApplicationContext applicationContext) { @@ -143,7 +143,7 @@ public void fromHeadersToReply(MessageHeaders headers, MessageProperties target) * @param errorChannel the error channel * @param destination the destination. * @param extendedProperties the extended properties. - * @param abstractApplicationContext the application context. + * @param applicationContext the application context. * @param headerMapperFunction the header mapper function. * @return the handler. */ diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/ObservationAutoConfiguration.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/ObservationAutoConfiguration.java deleted file mode 100644 index 60729f028d..0000000000 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/ObservationAutoConfiguration.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2022-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rabbit.config; - -import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; -import org.springframework.amqp.rabbit.listener.MessageListenerContainer; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.cloud.stream.config.ListenerContainerCustomizer; -import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer; -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; -import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint; -import org.springframework.messaging.MessageHandler; - -/** - * @author Oleg Zhurakousky - * @author Byungjun You - */ -@Configuration(proxyBeanMethods = false) -@ConditionalOnBean(org.springframework.boot.actuate.autoconfigure.observation.ObservationAutoConfiguration.class) -public class ObservationAutoConfiguration { - - @Bean - @Order(Ordered.HIGHEST_PRECEDENCE) - ListenerContainerCustomizer observedListenerContainerCustomizer( - ApplicationContext applicationContext) { - return (container, destinationName, group) -> { - if (container instanceof AbstractMessageListenerContainer abstractMessageListenerContainer) { - abstractMessageListenerContainer.setObservationEnabled(true); - abstractMessageListenerContainer.setApplicationContext(applicationContext); - } - }; - } - - @Bean - @Order(Ordered.HIGHEST_PRECEDENCE) - ProducerMessageHandlerCustomizer observedProducerMessageHandlerCustomizer( - ApplicationContext applicationContext) { - return (handler, destinationName) -> { - if (handler instanceof AmqpOutboundEndpoint amqpOutboundEndpoint) { - amqpOutboundEndpoint.getRabbitTemplate().setObservationEnabled(true); - amqpOutboundEndpoint.getRabbitTemplate().setApplicationContext(applicationContext); - } - }; - } -} diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 2936199269..a6aa77f21a 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1 @@ org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration -org.springframework.cloud.stream.binder.rabbit.config.ObservationAutoConfiguration diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/integration/RabbitMultiBinderObservationTests.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/integration/RabbitMultiBinderObservationTests.java new file mode 100644 index 0000000000..ae4fc13f13 --- /dev/null +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/integration/RabbitMultiBinderObservationTests.java @@ -0,0 +1,110 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rabbit.integration; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import brave.handler.SpanHandler; +import brave.test.TestSpanHandler; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.tracing.brave.bridge.BraveFinishedSpan; +import io.micrometer.tracing.test.simple.SpansAssert; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.RabbitMQContainer; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.binder.rabbit.RabbitTestContainer; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * @author Artem Bilan + * @since 4.1.1 + */ +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, + args = "--spring.config.location=classpath:/rabbit-multi-binder-observation.yml") +@DirtiesContext +@AutoConfigureObservability +public class RabbitMultiBinderObservationTests { + + private static final TestSpanHandler SPANS = new TestSpanHandler(); + + private static final RabbitMQContainer RABBITMQ = RabbitTestContainer.sharedInstance(); + + @Autowired + StreamBridge streamBridge; + + @Autowired + ObservationRegistry observationRegistry; + + @Autowired + TestConfiguration testConfiguration; + + @DynamicPropertySource + static void rabbitProperties(DynamicPropertyRegistry registry) { + registry.add("spring.rabbitmq.port", RABBITMQ::getAmqpPort); + } + + @Test + void observationIsPropagatedInMultiBinderConfiguration() throws InterruptedException { + Observation.createNotStarted("test parent observation", this.observationRegistry) + .observe(() -> this.streamBridge.send("test-out-0", "test data")); + + assertThat(this.testConfiguration.messageReceived.await(10, TimeUnit.SECONDS)).isTrue(); + + // There is a race condition when we already have a reply, but the span in the + // Rabbit listener is not closed yet. + // parent -> StreamBridge -> RabbitTemplate -> Rabbit Listener -> Consumer + await().untilAsserted(() -> assertThat(SPANS.spans()).hasSize(5)); + SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList())) + .haveSameTraceId(); + } + + @SpringBootConfiguration + @EnableAutoConfiguration + public static class TestConfiguration { + + final CountDownLatch messageReceived = new CountDownLatch(1); + + @Bean + SpanHandler testSpanHandler() { + return SPANS; + } + + @Bean + public Consumer> testListener() { + return message -> this.messageReceived.countDown(); + } + + } + +} diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/resources/rabbit-multi-binder-observation.yml b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/resources/rabbit-multi-binder-observation.yml new file mode 100644 index 0000000000..bd7961413e --- /dev/null +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/resources/rabbit-multi-binder-observation.yml @@ -0,0 +1,34 @@ +spring: + cloud: + function: + definition: testListener + stream: + output-bindings: test + bindings: + test-out-0: + binder: rabbit + destination: test + group: test + testListener-in-0: + binder: rabbit + destination: test + group: test + binders: + rabbit: + type: rabbit + environment: + spring: + cloud: + stream: + rabbit: + binder: + enableObservation: true +logging: + pattern: + level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]" + +management: + tracing: + sampling: + probability: 1 + diff --git a/core/spring-cloud-stream/src/main/resources/META-INF/shared.beans b/core/spring-cloud-stream/src/main/resources/META-INF/shared.beans index 92194a7bf4..fe46b0a02d 100644 --- a/core/spring-cloud-stream/src/main/resources/META-INF/shared.beans +++ b/core/spring-cloud-stream/src/main/resources/META-INF/shared.beans @@ -13,3 +13,4 @@ org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer org.springframework.cloud.stream.binder.test.InputDestination org.springframework.cloud.stream.binder.test.OutputDestination org.springframework.cloud.stream.config.BindingHandlerAdvise +io.micrometer.observation.ObservationRegistry