Skip to content

Commit a88c34f

Browse files
agustino-limolegz
authored andcommitted
GH-3033: Register ObservationRegistry for Dynamic MessageChannels
Fixes: gh-3033 * ensure `ObservationRegistry` is registered on dynamically created `MessageChannel` instances in `StreamBridge`
1 parent f392488 commit a88c34f

6 files changed

Lines changed: 45 additions & 4 deletions

File tree

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderObservationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ void endToEndReactorKafkaBinder1() {
100100
streamBridge.send("rkbot-in-topic", MessageBuilder.withPayload("data")
101101
.build());
102102

103-
await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(SPANS.spans()).hasSize(3));
103+
await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(SPANS.spans()).hasSize(4));
104104
SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList()))
105105
.haveSameTraceId();
106106
}

core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,21 @@ void dynamicProducerDestination() {
856856
assertThat(new String(message.getPayload())).isEqualTo("JOHN DOE");
857857
}
858858

859+
@Test
860+
void test_3033() {
861+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
862+
TestChannelBinderConfiguration.getCompleteConfiguration(
863+
EmptyConfiguration.class)).web(WebApplicationType.NONE).run(
864+
"--spring.cloud.stream.source=outputA",
865+
"--spring.jmx.enabled=false")) {
866+
StreamBridge streamBridge = context.getBean(StreamBridge.class);
867+
streamBridge.send("outputA", MessageBuilder.withPayload("A").build());
868+
869+
OutputDestination output = context.getBean(OutputDestination.class);
870+
assertThat(output.receive(1000, "outputA").getHeaders().containsKey("traceparent")).isTrue();
871+
}
872+
}
873+
859874
@EnableAutoConfiguration
860875
public static class DynamicProducerDestinationConfig {
861876
@Bean

core/spring-cloud-stream-test-binder/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@
2424
<groupId>org.springframework.boot</groupId>
2525
<artifactId>spring-boot-starter-test</artifactId>
2626
</dependency>
27+
<dependency>
28+
<groupId>org.springframework.boot</groupId>
29+
<artifactId>spring-boot-starter-actuator</artifactId>
30+
</dependency>
31+
<dependency>
32+
<groupId>io.micrometer</groupId>
33+
<artifactId>micrometer-tracing-bridge-brave</artifactId>
34+
</dependency>
2735
</dependencies>
2836

2937
</project>

core/spring-cloud-stream-test-binder/src/main/java/org/springframework/cloud/stream/binder/test/TestChannelBinderConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.Arrays;
2121
import java.util.List;
2222

23+
import io.micrometer.observation.ObservationRegistry;
24+
2325
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2426
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
2527
import org.springframework.cloud.stream.binder.Binder;
@@ -107,4 +109,8 @@ public TestChannelBinderProvisioner springIntegrationProvisioner() {
107109
return new TestChannelBinderProvisioner();
108110
}
109111

112+
@Bean
113+
public ObservationRegistry observationRegistry() {
114+
return ObservationRegistry.create();
115+
}
110116
}

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.stream.StreamSupport;
3737

3838
import io.micrometer.context.ContextSnapshotFactory;
39+
import io.micrometer.observation.ObservationRegistry;
3940
import org.apache.commons.logging.Log;
4041
import org.apache.commons.logging.LogFactory;
4142
import org.reactivestreams.Publisher;
@@ -46,6 +47,7 @@
4647

4748
import org.springframework.beans.BeansException;
4849
import org.springframework.beans.factory.InitializingBean;
50+
import org.springframework.beans.factory.ObjectProvider;
4951
import org.springframework.beans.factory.config.BeanDefinition;
5052
import org.springframework.beans.factory.support.RootBeanDefinition;
5153
import org.springframework.boot.autoconfigure.AutoConfiguration;
@@ -151,8 +153,10 @@ public class FunctionConfiguration {
151153
@Bean
152154
public StreamBridge streamBridgeUtils(FunctionCatalog functionCatalog,
153155
BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext,
154-
@Nullable NewDestinationBindingCallback callback) {
155-
return new StreamBridge(functionCatalog, bindingServiceProperties, applicationContext, callback);
156+
@Nullable NewDestinationBindingCallback callback,
157+
ObjectProvider<ObservationRegistry> observationRegistries) {
158+
return new StreamBridge(functionCatalog, bindingServiceProperties, applicationContext, callback,
159+
observationRegistries);
156160
}
157161

158162
@Bean

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929

3030
import io.micrometer.context.ContextExecutorService;
3131
import io.micrometer.context.ContextSnapshotFactory;
32+
import io.micrometer.observation.ObservationRegistry;
3233
import org.apache.commons.logging.Log;
3334
import org.apache.commons.logging.LogFactory;
3435

3536
import org.springframework.beans.factory.DisposableBean;
37+
import org.springframework.beans.factory.ObjectProvider;
3638
import org.springframework.beans.factory.SmartInitializingSingleton;
3739
import org.springframework.cloud.function.context.FunctionCatalog;
3840
import org.springframework.cloud.function.context.FunctionRegistration;
@@ -126,14 +128,15 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi
126128

127129
private static final ReentrantLock lock = new ReentrantLock();
128130

131+
private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
129132
/**
130133
*
131134
* @param functionCatalog instance of {@link FunctionCatalog}
132135
* @param bindingServiceProperties instance of {@link BindingServiceProperties}
133136
* @param applicationContext instance of {@link ConfigurableApplicationContext}
134137
*/
135138
StreamBridge(FunctionCatalog functionCatalog, BindingServiceProperties bindingServiceProperties,
136-
ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback) {
139+
ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback, ObjectProvider<ObservationRegistry> observationRegistries) {
137140
this.executorService = Executors.newCachedThreadPool();
138141
Assert.notNull(functionCatalog, "'functionCatalog' must not be null");
139142
Assert.notNull(applicationContext, "'applicationContext' must not be null");
@@ -158,6 +161,7 @@ protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> eldest) {
158161
};
159162
this.functionInvocationHelper = applicationContext.getBean(FunctionInvocationHelper.class);
160163
this.streamBridgeFunctionCache = new HashMap<>();
164+
observationRegistries.ifAvailable(registry -> this.observationRegistry = registry);
161165
}
162166

163167
@Override
@@ -288,8 +292,12 @@ MessageChannel resolveDestination(String destinationName, ProducerProperties pro
288292
messageChannel = this.isAsync() ? new ExecutorChannel(this.executorService) : new DirectWithAttributesChannel();
289293
((AbstractSubscribableChannel) messageChannel).setApplicationContext(applicationContext);
290294
((AbstractSubscribableChannel) messageChannel).setComponentName(destinationName);
295+
//<<<<<<< HEAD
291296

292297
BinderWrapper binderWrapper = bindingService.createBinderWrapper(binderName, destinationName, messageChannel.getClass());
298+
//=======
299+
((AbstractSubscribableChannel) messageChannel).registerObservationRegistry(observationRegistry);
300+
//>>>>>>> a1418283c (GH-3033: Register ObservationRegistry for Dynamic MessageChannels)
293301
if (this.destinationBindingCallback != null) {
294302
Object extendedProducerProperties = this.bindingService
295303
.getExtendedProducerProperties(binderWrapper.binder(), destinationName);

0 commit comments

Comments
 (0)