Skip to content

Conversation

@omercelikceng
Copy link
Contributor

Hello, I have approximately 70 microservices. I also run each service at scale (scale number 3). If I start the services at the same time, I see some errors. I will put the error information below.

Then I reviewed the code. Actually, the code block that I think causes the error is the getPartitionsForTopic method in the KafkaTopicProvisioner class.

ErrorPhoto

When I examine the code, it is deliberately ensured that it is not retried. But when I edit the code this way, it retries and does not throw an error. I think we need to retry here. Can you review?

Error :

023-11-12 14:28:33.918  WARN 7456 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : The number of expected partitions was: 1, but 0 has been found instead.There will be 1 idle consumers
2023-11-12 14:28:41.045 ERROR 7456 --- [           main] o.s.c.s.b.BindingService                 : Failed to create consumer binding; retrying in 30 seconds

org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:480) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:92) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
	at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
	at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:180) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
	at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:137) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
	at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
	at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
	at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[?:?]
	at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
	at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.13.jar:5.3.13]
	at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.13.jar:5.3.13]
	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.13.jar:5.3.13]
	at java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.13.jar:5.3.13]
	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.13.jar:5.3.13]
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.13.jar:5.3.13]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.13.jar:5.3.13]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:144) ~[spring-boot-2.4.13.jar:2.4.13]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:771) ~[spring-boot-2.4.13.jar:2.4.13]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:763) ~[spring-boot-2.4.13.jar:2.4.13]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:438) ~[spring-boot-2.4.13.jar:2.4.13]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:339) ~[spring-boot-2.4.13.jar:2.4.13]
	at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:123) ~[spring-boot-test-2.4.13.jar:2.4.13]
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99) ~[spring-test-5.3.13.jar:5.3.13]
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:124) ~[spring-test-5.3.13.jar:5.3.13]
	at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:124) ~[spring-test-5.3.13.jar:5.3.13]
	at org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener.postProcessFields(MockitoTestExecutionListener.java:110) ~[spring-boot-test-2.4.13.jar:2.4.13]
	at org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener.injectFields(MockitoTestExecutionListener.java:94) ~[spring-boot-test-2.4.13.jar:2.4.13]
	at org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener.prepareTestInstance(MockitoTestExecutionListener.java:61) ~[spring-boot-test-2.4.13.jar:2.4.13]
	at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:248) ~[spring-test-5.3.13.jar:5.3.13]
	at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:138) ~[spring-test-5.3.13.jar:5.3.13]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$6(ClassBasedTestDescriptor.java:350) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.executeAndMaskThrowable(ClassBasedTestDescriptor.java:355) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$7(ClassBasedTestDescriptor.java:350) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[?:?]
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1654) ~[?:?]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
	at java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:312) ~[?:?]
	at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) ~[?:?]
	at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) ~[?:?]
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) ~[?:?]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestInstancePostProcessors(ClassBasedTestDescriptor.java:349) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$instantiateAndPostProcessTestInstance$4(ClassBasedTestDescriptor.java:270) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance(ClassBasedTestDescriptor.java:269) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$2(ClassBasedTestDescriptor.java:259) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at java.util.Optional.orElseGet(Optional.java:369) ~[?:?]
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$3(ClassBasedTestDescriptor.java:258) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at org.junit.jupiter.engine.execution.TestInstancesProvider.getTestInstances(TestInstancesProvider.java:31) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$prepare$0(TestMethodTestDescriptor.java:101) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:100) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:65) ~[junit-jupiter-engine-5.7.2.jar:5.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$prepare$1(NodeTestTask.java:111) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.prepare(NodeTestTask.java:111) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:79) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at java.util.ArrayList.forEach(ArrayList.java:1540) ~[?:?]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at java.util.ArrayList.forEach(ArrayList.java:1540) ~[?:?]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) ~[junit-platform-engine-1.7.2.jar:1.7.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108) ~[junit-platform-launcher-1.7.2.jar:1.7.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) ~[junit-platform-launcher-1.7.2.jar:1.7.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) ~[junit-platform-launcher-1.7.2.jar:1.7.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) [junit-platform-launcher-1.7.2.jar:1.7.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) [junit-platform-launcher-1.7.2.jar:1.7.2]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96) [junit-platform-launcher-1.7.2.jar:1.7.2]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75) [junit-platform-launcher-1.7.2.jar:1.7.2]
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69) [junit5-rt.jar:?]
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) [junit-rt.jar:?]
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) [junit-rt.jar:?]
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) [junit-rt.jar:?]
Caused by: java.lang.NullPointerException
	at java.util.ArrayList.addAll(ArrayList.java:701) ~[?:?]
	at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:623) ~[spring-cloud-stream-binder-kafka-3.1.6.jar:3.1.6]
	at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:159) ~[spring-cloud-stream-binder-kafka-3.1.6.jar:3.1.6]
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:426) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
	... 93 more

}
// do a sanity check on the partition set
int partitionSize = CollectionUtils.isEmpty(partitions) ? 0 : partitions.size();
int partitionSize = partitions.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If partittions is null that will cause an NPE, but that is ok, since that is caught by the catch block and retried. Is that the thinking you had?

Copy link
Contributor Author

@omercelikceng omercelikceng Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When Partitions returns null, it does not retry in this method. Because the tolerateLowerPartitionsOnBroker value for consumer is true.
It is retried after 30 seconds with a schedule. However, the service has already started. This situation is then retried asynchronously. But I think it would be better to retry it here.

The retrying code blocks are below.

BindingService.class
	public <T> Binding<T> doBindConsumer(T input, String inputName,
			Binder<T, ConsumerProperties, ?> binder,
			ConsumerProperties consumerProperties, String target) {
		if (this.taskScheduler == null
				|| this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
			return binder.bindConsumer(target,
					this.bindingServiceProperties.getGroup(inputName), input,
					consumerProperties);
		}
		else {
			try {
				return binder.bindConsumer(target,
						this.bindingServiceProperties.getGroup(inputName), input,
						consumerProperties);
			}
			catch (RuntimeException e) {
				LateBinding<T> late = new LateBinding<T>(target,
						e.getCause() == null ? e.toString() : e.getCause().getMessage(), consumerProperties, true, this.objectMapper);
				rescheduleConsumerBinding(input, inputName, binder, consumerProperties,
						target, late, e);
				this.consumerBindings.put(inputName, Collections.singletonList(late));
				return late;
			}
		}
	}

	public <T> void rescheduleConsumerBinding(final T input, final String inputName,
			final Binder<T, ConsumerProperties, ?> binder,
			final ConsumerProperties consumerProperties, final String target,
			final LateBinding<T> late, RuntimeException exception) {
		assertNotIllegalException(exception);
		this.log.error("Failed to create consumer binding; retrying in "
				+ this.bindingServiceProperties.getBindingRetryInterval() + " seconds",
				exception);
		this.scheduleTask(() -> {
			try {
				late.setDelegate(binder.bindConsumer(target,
						this.bindingServiceProperties.getGroup(inputName), input,
						consumerProperties));
			}
			catch (RuntimeException e) {
				rescheduleConsumerBinding(input, inputName, binder, consumerProperties,
						target, late, e);
			}
		});
	}

@sobychacko
Copy link
Contributor

sobychacko commented Nov 30, 2023

Majority of the code changes in this PR are not in getPartitionsForTopic, but rather in createTopicAndPartitions. Was that intentional? Otherwisde, the code looks good to me.

@omercelikceng
Copy link
Contributor Author

Majority of the code changes in this PR are not in getPartitionsForTopic, but rather in createTopicAndPartitions. Was that intentional? Otherwisde, the code looks good to me.

The flow that caused me to write these codes is as follows. The relevant methods are called in order. Then the getPartitionInfo method is called. But here the tolerateLowerPartitionsOnBroker value is sent as true. That's why that block of code doesn't retry.

AbstractMessageChannelBinder.class -> doBindConsumer() -> createConsumerEndpoint() -> processTopic() -> getPartitionInfo()

private Collection<PartitionInfo> getPartitionInfo(String topic,
		final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
		final ConsumerFactory<?, ?> consumerFactory, int partitionCount) {
	return provisioningProvider.getPartitionsForTopic(partitionCount,
			extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
			() -> {
				try (Consumer<?, ?> consumer = consumerFactory.createConsumer()) {
					return consumer.partitionsFor(topic);
				}
			}, topic);
}

// we can determine
// that the exception was due to an unknown topic on the broker, just
// simply rethrow that.
if (ex instanceof UnknownTopicOrPartitionException) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't comment on the block above. That's why I'm writing here.

// This call may return null or throw an exception.
					partitions = callable.call();

This block returns null. However, if tolerateLowerPartitionsOnBroker is true in the block below, it is not retried. As I understand it, the tolerateLowerPartitionsOnBroker parameter does the following: If the actual number of partitions is less than the expected number of partitions, should this situation be tolerated?
However, when the partition comes to null, I do not have the actual number of partitions. Actually, it came back as null for another reason, so I need to retry this immediately. For this reason, I wrote a code that throws an exception if partitions are null.

@sobychacko
Copy link
Contributor

@omercelikceng Thanks for the explanations. I am taking it locally for a spin, and if all goes well, I will merge shortly.

@omercelikceng
Copy link
Contributor Author

omercelikceng commented Nov 30, 2023

@omercelikcengAçıklamalar için teşekkürler. Yerel olarak bir tur atacağım ve her şey yolunda giderse kısa süre içinde birleşeceğim.

@sobychacko Can you merge the first other pull request? Otherwise, 1 test throws an error in this code. Since the test threw an error, I was able to find the other error.

@sobychacko
Copy link
Contributor

@omercelikceng Thank you for the PR! Merged upstream.

@sobychacko sobychacko closed this Dec 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants