Skip to content

Kafka Binder's partitionCount (auto-discovered from the broker) gets reset after environment refresh #3054

@wojkur

Description

@wojkur

Description
Our topics definition and creation is handled on the infrastructure level (security reason - clients have no rights to create and manage topics), and we rely on KafkaMessageChannelBinder's capability of discovering actual number of partitions by fetching metadata directly from Kafka brokers.
Last night we've experienced severe performance degradation, which turned out to be caused by producers suddenly sending all messages to partition 0 (which effectively disabled data partitioning). We've also noticed that restarting applications helps, so after few hours of code analysis and debugging we came to the conclusion that environment refreshes (triggered by RefreshRemoveApplicationEvent) sets the partition count back to 1 (default value) for all producers.

To Reproduce
Pre-condition: have a local Kafka cluster running with a topic (example uses sample-topic name), with at least 2 partitions (tested on 4).

  1. Define sample consumer capable of logging and tracking messages' partitions:
@Component
public class PartitionObserver implements Consumer<Message<String>> {

  private static final Logger log = LoggerFactory.getLogger(PartitionObserver.class);

  private final Set<Integer> observedPartitions = new HashSet<>();
  private final AtomicLong messageCounter = new AtomicLong();

  @Override
  public void accept(Message<String> message) {
    var partition = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION, Integer.class);
    log.info("Received message {} on partition {}", message.getPayload(), partition);
    observedPartitions.add(partition);
    messageCounter.incrementAndGet();
  }

  public long messageCount() {
    return messageCounter.get();
  }

  public int uniquePartitions() {
    return observedPartitions.size();
  }

  public void reset() {
    observedPartitions.clear();
    messageCounter.set(0);
  }
}
  1. Provide binders configuration:
spring:
  application:
    name: kafka-partitioning-issue
  cloud:
    function:
      definition: partitionObserver
    stream:
      bindings:
        partitionObserver-in-0:
          destination: sample-topic
          group: sample-group
        outbound:
          destination: sample-topic
          producer:
            # partition-count omitted on purpose
            partition-key-expression: headers['partitioning-key']
  1. Run two batches of messages which tracks and logs how partitioning worked, with environment refresh between:
@SpringBootApplication
public class KafkaPartitioningIssueApplication implements CommandLineRunner {

  private static final Logger log =
      LoggerFactory.getLogger(KafkaPartitioningIssueApplication.class);

  @Autowired private PartitionObserver partitionObserver;
  @Autowired private StreamBridge streamBridge;
  @Autowired private ApplicationEventPublisher eventPublisher;

  @Value("{spring.application.name}")
  private String applicationName;

  public static void main(String[] args) {
    SpringApplication.run(KafkaPartitioningIssueApplication.class, args);
  }

  @Override
  public void run(String... args) throws Exception {
    int messageCount = 8;

    // initial batch
    processBatch(messageCount);

    // refresh
    partitionObserver.reset();
    eventPublisher.publishEvent(
        new RefreshRemoteApplicationEvent(this, applicationName, applicationName + ":**"));

    // second batch
    processBatch(messageCount);
  }

  private void processBatch(int messageCount) throws InterruptedException {
    for (int i = 0; i < messageCount; ++i) {
      streamBridge.send(
          "outbound",
          MessageBuilder.withPayload(String.valueOf(i)).setHeader("partitioning-key", i).build());
    }
    // wait for all messages to be consumed back
    while (partitionObserver.messageCount() < messageCount) {
      Thread.sleep(Duration.ofMillis(10));
    }
    log.info("Observed partitions: {}", partitionObserver.uniquePartitions());
  }
}
  1. Observe logs:
2024-12-11T09:10:14.104+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 0 on partition 0
2024-12-11T09:10:14.105+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 1 on partition 1
2024-12-11T09:10:14.106+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 5 on partition 1
2024-12-11T09:10:14.107+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 2 on partition 2
2024-12-11T09:10:14.108+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 6 on partition 2
2024-12-11T09:10:14.109+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 3 on partition 3
2024-12-11T09:10:14.109+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 7 on partition 3
2024-12-11T09:10:14.115+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 4 on partition 0
2024-12-11T09:10:14.127+01:00  INFO 5900 --- [kafka-partitioning-issue] [           main] c.e.k.KafkaPartitioningIssueApplication  : Observed partitions: 4
2024-12-11T09:10:14.128+01:00  INFO 5900 --- [kafka-partitioning-issue] [           main] o.s.cloud.bus.event.RefreshListener      : Received remote refresh request.
2024-12-11T09:10:14.206+01:00  INFO 5900 --- [kafka-partitioning-issue] [           main] o.s.cloud.bus.event.RefreshListener      : Keys refreshed []
2024-12-11T09:10:14.213+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 0 on partition 0
2024-12-11T09:10:14.216+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 1 on partition 0
2024-12-11T09:10:14.217+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 2 on partition 0
2024-12-11T09:10:14.220+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 3 on partition 0
2024-12-11T09:10:14.221+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 4 on partition 0
2024-12-11T09:10:14.225+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 5 on partition 0
2024-12-11T09:10:14.225+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 6 on partition 0
2024-12-11T09:10:14.226+01:00  INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver                  : Received message 7 on partition 0
2024-12-11T09:10:14.228+01:00  INFO 5900 --- [kafka-partitioning-issue] [           main] c.e.k.KafkaPartitioningIssueApplication  : Observed partitions: 1

Version of the framework
Verified on both 3.4.0 and 3.3.4

Components used
spring-cloud-starter-stream-kafka
spring-cloud-starter-bus-kafka

Expected behavior
We should end up with the same partitioning for both batches, but after environment refresh all of them are produced to partition 0.

Additional context
I believe the issue lies within KafkaMessageChannelBinder, discovered partitions count is stored directly within producerProperties, which gets re-instantiated by default during environment refresh. Partitioner accesses producerProperties directly.

@Override
protected MessageHandler createProducerMessageHandler(
	final ProducerDestination destination,
	ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
	MessageChannel channel, MessageChannel errorChannel) throws Exception {
// ...
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionInfoForProducer(
		destination.getName(), producerFB, producerProperties);
this.topicsInUse.put(destination.getName(),
		new TopicInformation(null, partitions, false));
if (producerProperties.isPartitioned()
		&& producerProperties.getPartitionCount() < partitions.size()) {
	if (this.logger.isInfoEnabled()) {
		this.logger.info("The `partitionCount` of the producer for topic "
				+ destination.getName() + " is "
				+ producerProperties.getPartitionCount()
				+ ", smaller than the actual partition count of "
				+ partitions.size()
				+ " for the topic. The larger number will be used instead.");
	}
	producerProperties.setPartitionCount(partitions.size()); // <-- here
	List<ChannelInterceptor> interceptors = ((InterceptableChannel) channel)
			.getInterceptors();
	interceptors.forEach((interceptor) -> {
		if (interceptor instanceof PartitioningInterceptor partitioningInterceptor) {
			partitioningInterceptor.setPartitionCount(partitions.size());
		}
		else if (interceptor instanceof DefaultPartitioningInterceptor defaultPartitioningInterceptor) {
			defaultPartitioningInterceptor.setPartitionCount(partitions.size());
		}
	});
}
// ...

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions