Describe the issue
According to documentation https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#rebalance-listener
if you provide a single KafkaBindingRebalanceListener bean in the application context, it will be wired into all Kafka consumer bindings.
However, it is not wired into custom binder with non-empty properties.
To Reproduce
application.yml:
spring:
cloud:
function:
definition: testConsumer
stream:
kafka:
binder:
brokers: localhost:9092
binders:
test-binder:
type: 'kafka'
environment:
spring.cloud.stream.kafka.binder:
consumer-properties:
max.poll.records: 1 # < at least one property here
bindings:
testConsumer-in-0:
binder: test-binder
group: test-group
destination: test-topic
Kotlin reproducer:
@SpringBootApplication
class Application {
@Bean
fun testConsumer() = java.util.function.Consumer<GenericMessage<ByteArray>> {
}
}
@Component
class RebalanceListener : KafkaBindingRebalanceListener {
override fun onPartitionsAssigned(bindingName: String, consumer: Consumer<*, *>, partitions: Collection<TopicPartition?>?, initial: Boolean) {
// In successful case this should be printed
println("RebalanceListener is wired and called")
}
}
fun main() {
runApplication<Application>()
}
Dependencies:
plugins {
kotlin("jvm") version "2.2.20"
kotlin("plugin.spring") version "2.2.20"
}
repositories {
mavenCentral()
}
dependencies {
implementation(platform("org.springframework.boot:spring-boot-dependencies:4.0.0-M3"))
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:2025.1.0-M3"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
}
If replace in application.yml
consumer-properties:
max.poll.records: 1
to
the problem does not reproduce, and KafkaBindingRebalanceListener is wired and called.
This may be related to the way the child application context is created https://github.com/spring-cloud/spring-cloud-stream/blob/v5.0.0-M3/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java#L477
ConfigurableApplicationContext initializeBinderContextSimple(String configurationName, Map<String, Object> binderProperties,
BinderType binderType, BinderConfiguration binderConfiguration, boolean refresh) {
...
boolean useApplicationContextAsParent = binderProperties.isEmpty()
&& this.context != null;
...
if (useApplicationContextAsParent) {
binderProducingContext.setParent(this.context);
}
If properties are not empty, the child context has no reference to the parent one, while KafkaBindingRebalanceListener is defined in the parent context.
Version of the framework: 2025.1.0-M3
Expected behavior: KafkaBindingRebalanceListener should be wired to all binders.
Describe the issue
According to documentation https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#rebalance-listener
However, it is not wired into custom binder with non-empty properties.
To Reproduce
application.yml:
Kotlin reproducer:
Dependencies:
plugins { kotlin("jvm") version "2.2.20" kotlin("plugin.spring") version "2.2.20" } repositories { mavenCentral() } dependencies { implementation(platform("org.springframework.boot:spring-boot-dependencies:4.0.0-M3")) implementation(platform("org.springframework.cloud:spring-cloud-dependencies:2025.1.0-M3")) implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka") }If replace in application.yml
to
the problem does not reproduce, and
KafkaBindingRebalanceListeneris wired and called.This may be related to the way the child application context is created https://github.com/spring-cloud/spring-cloud-stream/blob/v5.0.0-M3/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java#L477
If properties are not empty, the child context has no reference to the parent one, while
KafkaBindingRebalanceListeneris defined in the parent context.Version of the framework: 2025.1.0-M3
Expected behavior: KafkaBindingRebalanceListener should be wired to all binders.