Describe the issue
After upgrading to version 4.0.4 we noticed that default partitioning is not working as expected (key.hashCode() % partitionCount) when using StreamBridge. It can be possibly associated with this commented block:
|
// Commenting out the following block due to this issue: https://github.com/spring-cloud/spring-cloud-stream/issues/2759 |
. Before, if condition is met,
DefaultPartitionInterceptor was added and
partitionHandler.determinePartition(message) was called, which is not the case now in version
4.0.4.
To Reproduce
We are using Spring Cloud Stream functions. The message key is a very simple record schema.
Steps to reproduce the behavior:
- Set the configuration properties
spring.cloud.stream:
default:
producer:
useNativeEncoding: true
partition-key-expression: headers['kafka_messageKey'].id
consumer.useNativeDecoding: true
function.autodetect: false
bindings:
testEventSource-out-0:
destination: test-event
kafka:
binder:
auto-create-topics: false
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
key.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- Run the following integration test
void sendEvent_whenUsingStreamBridge_shouldRespectPartitioningScheme() {
final int numberOfPartitions = 6;
final Message<KafkaNull> outgoingMessage = MessageBuilder.withPayload(KafkaNull.INSTANCE)
.setHeader(KafkaHeaders.KEY, TestEventKey.newBuilder().setId(1001L).build())
.build();
streamBridge.send("testEventSource-out-0", outgoingMessage);
final ConsumerRecords<Object, Object> records = testConsumer.poll(Duration.ofSeconds(5L));
assertThat(records.iterator().next().partition()).isEqualTo(eventId % numberOfPartitions);
}
We have used KafkaNull.INSTANCE but it's the same behaviour with the regular message.
4. See the failed assertion
Version of the framework
SpringBoot 3.1.2
SpringCloudStream 4.0.4
Expected behavior
When using defaults partition should be selected based on the following function: (key.hashCode() % partitionCount).
Describe the issue
After upgrading to version
4.0.4we noticed that default partitioning is not working as expected(key.hashCode() % partitionCount)when using StreamBridge. It can be possibly associated with this commented block:spring-cloud-stream/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java
Line 263 in e06d4ee
DefaultPartitionInterceptorwas added andpartitionHandler.determinePartition(message)was called, which is not the case now in version4.0.4.To Reproduce
We are using Spring Cloud Stream functions. The message key is a very simple record schema.
Steps to reproduce the behavior:
We have used
KafkaNull.INSTANCEbut it's the same behaviour with the regular message.4. See the failed assertion
Version of the framework
SpringBoot 3.1.2
SpringCloudStream 4.0.4
Expected behavior
When using defaults partition should be selected based on the following function:
(key.hashCode() % partitionCount).