Greetings, everyone! I encountered an issue with the most recent release of Spring Cloud Stream + Kafka binder (release train 2023.0.2) when using KafkaNull (tombstone records) and TestBinder. With version 2023.0.1, all tests asserting the output of functions that returned KafkaNull passed, whereas, in this new version, they don't. The output is now transformed into byte[], and the payload now has the byte equivalent of "{}".
Here are the relevant snippets (extremely minimalist):
build.gradle:
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.1'
id 'io.spring.dependency-management' version '1.1.5'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "2023.0.2")
}
dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation "org.springframework.cloud:spring-cloud-starter-stream-kafka"
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
// testImplementation 'org.springframework.cloud:spring-cloud-starter-contract-stub-runner'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
Function configuration:
@SpringBootApplication
public class Gh2971Application {
public static void main(String[] args) {
SpringApplication.run(Gh2971Application.class, args);
}
@Bean
public Function<Message<?>,Message<?>> myFunction() {
return v -> MessageBuilder.withPayload(KafkaNull.INSTANCE).build();
}
}
Test:
//
@ImportAutoConfiguration(TestChannelBinderConfiguration.class)
@SpringBootTest(properties = {"spring.cloud.function.definition=myFunction"})
class Gh2971ApplicationTests {
@Value("${spring.cloud.stream.bindings.myFunction-in-0.destination}")
private String inputTopic;
@Value("${spring.cloud.stream.bindings.myFunction-out-0.destination}")
private String outputTopic;
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
@Test
void test() {
var message =
MessageBuilder.withPayload(KafkaNull.INSTANCE)
.build();
this.input.send(message, this.inputTopic);
var received = this.output.receive(0L, this.outputTopic);
assertThat(received)
.isNotNull()
.extracting(Message::getPayload)
.isEqualTo(KafkaNull.INSTANCE);
}
}
Application properties:
spring.application.name=gh-2971
spring.cloud.function.definition=myFunction
spring.cloud.stream.bindings.myFunction-in-0.destination=in
spring.cloud.stream.bindings.myFunction-out-0.destination=out
spring.cloud.stream.kafka.binder.consumer-properties.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.binder.producer-properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.spring.json.use.type.headers=false
Expected Behavior:
The test passes.
Actual Behavior:
The test fails with the following message:
org.opentest4j.AssertionFailedError:
expected: org.springframework.kafka.support.KafkaNull@c317472
but was: [123, 125]
Expected :org.springframework.kafka.support.KafkaNull@c317472
Actual :[123, 125]
Additional context:
Spring Cloud Stream version: 4.1.2
Kafka Binder version: 4.1.2
Test Binder: 4.1.2
Greetings, everyone! I encountered an issue with the most recent release of Spring Cloud Stream + Kafka binder (release train 2023.0.2) when using KafkaNull (tombstone records) and TestBinder. With version 2023.0.1, all tests asserting the output of functions that returned KafkaNull passed, whereas, in this new version, they don't. The output is now transformed into
byte[], and the payload now has the byte equivalent of"{}".Here are the relevant snippets (extremely minimalist):
build.gradle:
Function configuration:
Test:
Application properties:
Expected Behavior:
The test passes.
Actual Behavior:
The test fails with the following message:
Additional context:
Spring Cloud Stream version: 4.1.2
Kafka Binder version: 4.1.2
Test Binder: 4.1.2