Description
Our topics definition and creation is handled on the infrastructure level (security reason - clients have no rights to create and manage topics), and we rely on KafkaMessageChannelBinder's capability of discovering actual number of partitions by fetching metadata directly from Kafka brokers.
Last night we've experienced severe performance degradation, which turned out to be caused by producers suddenly sending all messages to partition 0 (which effectively disabled data partitioning). We've also noticed that restarting applications helps, so after few hours of code analysis and debugging we came to the conclusion that environment refreshes (triggered by RefreshRemoveApplicationEvent) sets the partition count back to 1 (default value) for all producers.
To Reproduce
Pre-condition: have a local Kafka cluster running with a topic (example uses sample-topic name), with at least 2 partitions (tested on 4).
- Define sample consumer capable of logging and tracking messages' partitions:
@Component
public class PartitionObserver implements Consumer<Message<String>> {
private static final Logger log = LoggerFactory.getLogger(PartitionObserver.class);
private final Set<Integer> observedPartitions = new HashSet<>();
private final AtomicLong messageCounter = new AtomicLong();
@Override
public void accept(Message<String> message) {
var partition = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION, Integer.class);
log.info("Received message {} on partition {}", message.getPayload(), partition);
observedPartitions.add(partition);
messageCounter.incrementAndGet();
}
public long messageCount() {
return messageCounter.get();
}
public int uniquePartitions() {
return observedPartitions.size();
}
public void reset() {
observedPartitions.clear();
messageCounter.set(0);
}
}
- Provide binders configuration:
spring:
application:
name: kafka-partitioning-issue
cloud:
function:
definition: partitionObserver
stream:
bindings:
partitionObserver-in-0:
destination: sample-topic
group: sample-group
outbound:
destination: sample-topic
producer:
# partition-count omitted on purpose
partition-key-expression: headers['partitioning-key']
- Run two batches of messages which tracks and logs how partitioning worked, with environment refresh between:
@SpringBootApplication
public class KafkaPartitioningIssueApplication implements CommandLineRunner {
private static final Logger log =
LoggerFactory.getLogger(KafkaPartitioningIssueApplication.class);
@Autowired private PartitionObserver partitionObserver;
@Autowired private StreamBridge streamBridge;
@Autowired private ApplicationEventPublisher eventPublisher;
@Value("{spring.application.name}")
private String applicationName;
public static void main(String[] args) {
SpringApplication.run(KafkaPartitioningIssueApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
int messageCount = 8;
// initial batch
processBatch(messageCount);
// refresh
partitionObserver.reset();
eventPublisher.publishEvent(
new RefreshRemoteApplicationEvent(this, applicationName, applicationName + ":**"));
// second batch
processBatch(messageCount);
}
private void processBatch(int messageCount) throws InterruptedException {
for (int i = 0; i < messageCount; ++i) {
streamBridge.send(
"outbound",
MessageBuilder.withPayload(String.valueOf(i)).setHeader("partitioning-key", i).build());
}
// wait for all messages to be consumed back
while (partitionObserver.messageCount() < messageCount) {
Thread.sleep(Duration.ofMillis(10));
}
log.info("Observed partitions: {}", partitionObserver.uniquePartitions());
}
}
- Observe logs:
2024-12-11T09:10:14.104+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 0 on partition 0
2024-12-11T09:10:14.105+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 1 on partition 1
2024-12-11T09:10:14.106+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 5 on partition 1
2024-12-11T09:10:14.107+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 2 on partition 2
2024-12-11T09:10:14.108+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 6 on partition 2
2024-12-11T09:10:14.109+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 3 on partition 3
2024-12-11T09:10:14.109+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 7 on partition 3
2024-12-11T09:10:14.115+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 4 on partition 0
2024-12-11T09:10:14.127+01:00 INFO 5900 --- [kafka-partitioning-issue] [ main] c.e.k.KafkaPartitioningIssueApplication : Observed partitions: 4
2024-12-11T09:10:14.128+01:00 INFO 5900 --- [kafka-partitioning-issue] [ main] o.s.cloud.bus.event.RefreshListener : Received remote refresh request.
2024-12-11T09:10:14.206+01:00 INFO 5900 --- [kafka-partitioning-issue] [ main] o.s.cloud.bus.event.RefreshListener : Keys refreshed []
2024-12-11T09:10:14.213+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 0 on partition 0
2024-12-11T09:10:14.216+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 1 on partition 0
2024-12-11T09:10:14.217+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 2 on partition 0
2024-12-11T09:10:14.220+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 3 on partition 0
2024-12-11T09:10:14.221+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 4 on partition 0
2024-12-11T09:10:14.225+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 5 on partition 0
2024-12-11T09:10:14.225+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 6 on partition 0
2024-12-11T09:10:14.226+01:00 INFO 5900 --- [kafka-partitioning-issue] [container-0-C-1] c.e.k.PartitionObserver : Received message 7 on partition 0
2024-12-11T09:10:14.228+01:00 INFO 5900 --- [kafka-partitioning-issue] [ main] c.e.k.KafkaPartitioningIssueApplication : Observed partitions: 1
Version of the framework
Verified on both 3.4.0 and 3.3.4
Components used
spring-cloud-starter-stream-kafka
spring-cloud-starter-bus-kafka
Expected behavior
We should end up with the same partitioning for both batches, but after environment refresh all of them are produced to partition 0.
Additional context
I believe the issue lies within KafkaMessageChannelBinder, discovered partitions count is stored directly within producerProperties, which gets re-instantiated by default during environment refresh. Partitioner accesses producerProperties directly.
@Override
protected MessageHandler createProducerMessageHandler(
final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
MessageChannel channel, MessageChannel errorChannel) throws Exception {
// ...
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionInfoForProducer(
destination.getName(), producerFB, producerProperties);
this.topicsInUse.put(destination.getName(),
new TopicInformation(null, partitions, false));
if (producerProperties.isPartitioned()
&& producerProperties.getPartitionCount() < partitions.size()) {
if (this.logger.isInfoEnabled()) {
this.logger.info("The `partitionCount` of the producer for topic "
+ destination.getName() + " is "
+ producerProperties.getPartitionCount()
+ ", smaller than the actual partition count of "
+ partitions.size()
+ " for the topic. The larger number will be used instead.");
}
producerProperties.setPartitionCount(partitions.size()); // <-- here
List<ChannelInterceptor> interceptors = ((InterceptableChannel) channel)
.getInterceptors();
interceptors.forEach((interceptor) -> {
if (interceptor instanceof PartitioningInterceptor partitioningInterceptor) {
partitioningInterceptor.setPartitionCount(partitions.size());
}
else if (interceptor instanceof DefaultPartitioningInterceptor defaultPartitioningInterceptor) {
defaultPartitioningInterceptor.setPartitionCount(partitions.size());
}
});
}
// ...
Description
Our topics definition and creation is handled on the infrastructure level (security reason - clients have no rights to create and manage topics), and we rely on KafkaMessageChannelBinder's capability of discovering actual number of partitions by fetching metadata directly from Kafka brokers.
Last night we've experienced severe performance degradation, which turned out to be caused by producers suddenly sending all messages to partition 0 (which effectively disabled data partitioning). We've also noticed that restarting applications helps, so after few hours of code analysis and debugging we came to the conclusion that environment refreshes (triggered by RefreshRemoveApplicationEvent) sets the partition count back to 1 (default value) for all producers.
To Reproduce
Pre-condition: have a local Kafka cluster running with a topic (example uses
sample-topicname), with at least 2 partitions (tested on 4).Version of the framework
Verified on both
3.4.0and3.3.4Components used
spring-cloud-starter-stream-kafkaspring-cloud-starter-bus-kafkaExpected behavior
We should end up with the same partitioning for both batches, but after environment refresh all of them are produced to partition 0.
Additional context
I believe the issue lies within
KafkaMessageChannelBinder, discovered partitions count is stored directly withinproducerProperties, which gets re-instantiated by default during environment refresh. Partitioner accessesproducerPropertiesdirectly.