Skip to content

Commit 3b5bb2e

Browse files
committed
Remove redundant delaySubscription from FunctionConfiguration
Related to: spring-projects/spring-integration#9362 After the fix in Spring Integration: spring-projects/spring-integration@bdcb856 we ended up in a deadlock situation with a `beginPublishingTrigger` in the `FunctionConfiguration` used for the `delaySubscription()` on an original `Publisher`. The `FluxMessageChannel` uses its own `delaySubscription()` until the channel has its subscribers. Apparently the logic before was with errors, so the `FluxMessageChannel` was marked as active even if its subscriber is not ready yet, leading to famous `Dispatcher does not have subscribers` error. So, looks like this `beginPublishingTrigger` was introduced back in days in Spring Cloud Stream to mitigate that situation until we really emit a `BindingCreatedEvent`. The deadlock (and the flaw, respectively) is with the `setupBindingTrigger()` method implementation where `FluxMessageChannel` now "really" delays a subscription to the provided `Publisher`, therefore not triggering that `Mono.create()` fulfilment immediately. The `BindingCreatedEvent` arrives earlier, than we have a subscriber on the channel, but `triggerRef.get()` is `null`, so we don't `success()` it and in the end don't subscribe to an original `Publisher` since `delaySubscription()` on it is never completed. Since `FunctionConfiguration` fully relies on `IntegrationFlow.from(Publisher)`, which ends up with the mentioned `FluxMessageChannel.subscribeTo()` and its own `delaySubscription()` (which, in turn, apparently fixed now), we don't need our own `delaySubscription()` any more. Therefore the fix in this PR is to propose to remove `beginPublishingTrigger` logic altogether.
1 parent f356599 commit 3b5bb2e

1 file changed

Lines changed: 6 additions & 26 deletions

File tree

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.reactivestreams.Publisher;
4141
import reactor.core.publisher.Flux;
4242
import reactor.core.publisher.Mono;
43-
import reactor.core.publisher.MonoSink;
4443
import reactor.util.function.Tuples;
4544

4645
import org.springframework.beans.BeansException;
@@ -65,7 +64,6 @@
6564
import org.springframework.cloud.function.context.message.MessageUtils;
6665
import org.springframework.cloud.stream.binder.BinderFactory;
6766
import org.springframework.cloud.stream.binder.BinderHeaders;
68-
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
6967
import org.springframework.cloud.stream.binder.ConsumerProperties;
7068
import org.springframework.cloud.stream.binder.PartitionHandler;
7169
import org.springframework.cloud.stream.binder.ProducerProperties;
@@ -129,6 +127,7 @@
129127
* @author Byungjun You
130128
* @author Ivan Shapoval
131129
* @author Patrik Péter Süli
130+
* @author Artem Bilan
132131
* @since 2.1
133132
*/
134133
@Lazy(false)
@@ -224,8 +223,6 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc
224223
functionWrapper = functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0]));
225224
}
226225

227-
Publisher<Object> beginPublishingTrigger = setupBindingTrigger(context);
228-
229226
if (!functionProperties.isComposeFrom() && !functionProperties.isComposeTo()) {
230227
String integrationFlowName = proxyFactory.getFunctionDefinition() + "_integrationflow";
231228

@@ -239,7 +236,7 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc
239236

240237
if (functionWrapper != null) {
241238
IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(functionWrapper, context, producerProperties),
242-
beginPublishingTrigger, pollable, context, taskScheduler, producerProperties, outputName)
239+
pollable, context, taskScheduler, producerProperties, outputName)
243240
.route(Message.class, message -> {
244241
if (message.getHeaders().get("spring.cloud.stream.sendto.destination") != null) {
245242
String destinationName = (String) message.getHeaders().get("spring.cloud.stream.sendto.destination");
@@ -253,7 +250,7 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc
253250
}
254251
else {
255252
IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(supplier, context, producerProperties),
256-
beginPublishingTrigger, pollable, context, taskScheduler, producerProperties, outputName)
253+
pollable, context, taskScheduler, producerProperties, outputName)
257254
.channel(c -> c.direct())
258255
.fluxTransform((Function<? super Flux<Message<Object>>, ? extends Publisher<Object>>) function)
259256
.route(Message.class, message -> {
@@ -274,26 +271,9 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc
274271
};
275272
}
276273

277-
278-
/*
279-
* Creates a publishing trigger to ensure Supplier does not begin publishing until binding is created
280-
*/
281-
private Publisher<Object> setupBindingTrigger(GenericApplicationContext context) {
282-
AtomicReference<MonoSink<Object>> triggerRef = new AtomicReference<>();
283-
Publisher<Object> beginPublishingTrigger = Mono.create(triggerRef::set);
284-
context.addApplicationListener(event -> {
285-
if (event instanceof BindingCreatedEvent) {
286-
if (triggerRef.get() != null) {
287-
triggerRef.get().success();
288-
}
289-
}
290-
});
291-
return beginPublishingTrigger;
292-
}
293-
294274
@SuppressWarnings({ "rawtypes", "unchecked" })
295275
private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier,
296-
Publisher<Object> beginPublishingTrigger, PollableBean pollable, GenericApplicationContext context,
276+
PollableBean pollable, GenericApplicationContext context,
297277
TaskScheduler taskScheduler, ProducerProperties producerProperties, String bindingName) {
298278

299279
IntegrationFlowBuilder integrationFlowBuilder;
@@ -309,8 +289,8 @@ private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> s
309289
if (pollable == null && reactive) {
310290
Publisher publisher = (Publisher) supplier.get();
311291
publisher = publisher instanceof Mono
312-
? ((Mono) publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary)
313-
: ((Flux) publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary);
292+
? ((Mono) publisher).map(this::wrapToMessageIfNecessary)
293+
: ((Flux) publisher).map(this::wrapToMessageIfNecessary);
314294

315295
integrationFlowBuilder = IntegrationFlow.from(publisher);
316296

0 commit comments

Comments
 (0)