Skip to content

TestBinder+Kafka not working as intended (tombstone records) #2971

@rnferreira

Description

@rnferreira

Greetings, everyone! I encountered an issue with the most recent release of Spring Cloud Stream + Kafka binder (release train 2023.0.2) when using KafkaNull (tombstone records) and TestBinder. With version 2023.0.1, all tests asserting the output of functions that returned KafkaNull passed, whereas, in this new version, they don't. The output is now transformed into byte[], and the payload now has the byte equivalent of "{}".

Here are the relevant snippets (extremely minimalist):

build.gradle:

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.3.1'
    id 'io.spring.dependency-management' version '1.1.5'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(21)
    }
}

repositories {
    mavenCentral()
}

ext {
    set('springCloudVersion', "2023.0.2")
}

dependencies {
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation "org.springframework.cloud:spring-cloud-starter-stream-kafka"
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
//    testImplementation 'org.springframework.cloud:spring-cloud-starter-contract-stub-runner'

}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

tasks.named('test') {
    useJUnitPlatform()
}

Function configuration:

@SpringBootApplication
public class Gh2971Application {

    public static void main(String[] args) {
        SpringApplication.run(Gh2971Application.class, args);
    }

    @Bean
    public Function<Message<?>,Message<?>> myFunction() {
        return v -> MessageBuilder.withPayload(KafkaNull.INSTANCE).build();
    }
}

Test:

// 
@ImportAutoConfiguration(TestChannelBinderConfiguration.class)
@SpringBootTest(properties = {"spring.cloud.function.definition=myFunction"})
class Gh2971ApplicationTests {

    @Value("${spring.cloud.stream.bindings.myFunction-in-0.destination}")
    private String inputTopic;

    @Value("${spring.cloud.stream.bindings.myFunction-out-0.destination}")
    private String outputTopic;

    @Autowired
    private InputDestination input;
    @Autowired
    private OutputDestination output;

    @Test
    void test() {
        var message =
                MessageBuilder.withPayload(KafkaNull.INSTANCE)
                        .build();
        this.input.send(message, this.inputTopic);

        var received = this.output.receive(0L, this.outputTopic);
        assertThat(received)
                .isNotNull()
                .extracting(Message::getPayload)
                .isEqualTo(KafkaNull.INSTANCE);
    }
}

Application properties:

spring.application.name=gh-2971
spring.cloud.function.definition=myFunction
spring.cloud.stream.bindings.myFunction-in-0.destination=in
spring.cloud.stream.bindings.myFunction-out-0.destination=out
spring.cloud.stream.kafka.binder.consumer-properties.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.binder.producer-properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.spring.json.use.type.headers=false

Expected Behavior:

The test passes.

Actual Behavior:

The test fails with the following message:

org.opentest4j.AssertionFailedError: 
expected: org.springframework.kafka.support.KafkaNull@c317472
 but was: [123, 125]
Expected :org.springframework.kafka.support.KafkaNull@c317472
Actual   :[123, 125]

Additional context:
Spring Cloud Stream version: 4.1.2
Kafka Binder version: 4.1.2
Test Binder: 4.1.2

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions