Skip to content

Default partitioning not respected with StreamBridge #2815

@dejank1986

Description

@dejank1986

Describe the issue
After upgrading to version 4.0.4 we noticed that default partitioning is not working as expected (key.hashCode() % partitionCount) when using StreamBridge. It can be possibly associated with this commented block:

// Commenting out the following block due to this issue: https://github.com/spring-cloud/spring-cloud-stream/issues/2759
. Before, if condition is met, DefaultPartitionInterceptor was added and partitionHandler.determinePartition(message) was called, which is not the case now in version 4.0.4.

To Reproduce
We are using Spring Cloud Stream functions. The message key is a very simple record schema.

Steps to reproduce the behavior:

  1. Set the configuration properties
spring.cloud.stream:
  default:
    producer:
      useNativeEncoding: true
      partition-key-expression: headers['kafka_messageKey'].id
    consumer.useNativeDecoding: true
  function.autodetect: false
  bindings:
    testEventSource-out-0:
      destination: test-event
  kafka:
    binder:
      auto-create-topics: false
      producer-properties:
        key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        key.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
        value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
  1. Run the following integration test
void sendEvent_whenUsingStreamBridge_shouldRespectPartitioningScheme() {
        final int numberOfPartitions = 6;

        final Message<KafkaNull> outgoingMessage = MessageBuilder.withPayload(KafkaNull.INSTANCE)
                .setHeader(KafkaHeaders.KEY, TestEventKey.newBuilder().setId(1001L).build())
                .build();

        streamBridge.send("testEventSource-out-0", outgoingMessage);

        final ConsumerRecords<Object, Object> records = testConsumer.poll(Duration.ofSeconds(5L));
        assertThat(records.iterator().next().partition()).isEqualTo(eventId % numberOfPartitions);
    }

We have used KafkaNull.INSTANCE but it's the same behaviour with the regular message.
4. See the failed assertion

Version of the framework
SpringBoot 3.1.2
SpringCloudStream 4.0.4

Expected behavior
When using defaults partition should be selected based on the following function: (key.hashCode() % partitionCount).

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions