Skip to content

Commit 8cdf1ab

Browse files
KAFKA-15738: Adding KRaft support in ConsumerWithLegacyMessageFormatIntegrationTest (#15171)
Reviewers: Manikumar Reddy <[email protected]>
1 parent da6f052 commit 8cdf1ab

1 file changed

Lines changed: 28 additions & 8 deletions

File tree

core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
package kafka.api
1818

1919
import kafka.server.KafkaConfig
20+
import kafka.utils.TestInfoUtils
2021
import org.apache.kafka.clients.producer.ProducerConfig
2122
import org.apache.kafka.common.TopicPartition
2223
import org.apache.kafka.common.config.TopicConfig
2324
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows}
24-
import org.junit.jupiter.api.Test
25+
import org.junit.jupiter.params.ParameterizedTest
26+
import org.junit.jupiter.params.provider.ValueSource
2527

2628
import java.util
2729
import java.util.{Collections, Optional, Properties}
@@ -32,12 +34,15 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
3234

3335
override protected def brokerPropertyOverrides(properties: Properties): Unit = {
3436
// legacy message formats are only supported with IBP < 3.0
35-
properties.put(KafkaConfig.InterBrokerProtocolVersionProp, "2.8")
37+
// KRaft mode is not supported for inter.broker.protocol.version = 2.8, The minimum version required is 3.0-IV1"
38+
if(!isKRaftTest())
39+
properties.put(KafkaConfig.InterBrokerProtocolVersionProp, "2.8")
3640
}
3741

3842
@nowarn("cat=deprecation")
39-
@Test
40-
def testOffsetsForTimes(): Unit = {
43+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
44+
@ValueSource(strings = Array("zk", "kraft"))
45+
def testOffsetsForTimes(quorum: String): Unit = {
4146
val numParts = 2
4247
val topic1 = "part-test-topic-1"
4348
val topic2 = "part-test-topic-2"
@@ -86,8 +91,22 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
8691
assertEquals(20, timestampTopic1P1.timestamp)
8792
assertEquals(Optional.of(0), timestampTopic1P1.leaderEpoch)
8893

89-
assertNull(timestampOffsets.get(new TopicPartition(topic2, 0)), "null should be returned when message format is 0.9.0")
90-
assertNull(timestampOffsets.get(new TopicPartition(topic2, 1)), "null should be returned when message format is 0.9.0")
94+
if(!isKRaftTest()) {
95+
assertNull(timestampOffsets.get(new TopicPartition(topic2, 0)), "null should be returned when message format is 0.9.0")
96+
assertNull(timestampOffsets.get(new TopicPartition(topic2, 1)), "null should be returned when message format is 0.9.0")
97+
}
98+
else {
99+
// legacy message formats are supported for IBP version < 3.0 and KRaft runs on minimum version 3.0-IV1
100+
val timestampTopic2P0 = timestampOffsets.get(new TopicPartition(topic2, 0))
101+
assertEquals(40, timestampTopic2P0.offset)
102+
assertEquals(40, timestampTopic2P0.timestamp)
103+
assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch)
104+
105+
val timestampTopic2P1 = timestampOffsets.get(new TopicPartition(topic2, 1))
106+
assertEquals(60, timestampTopic2P1.offset)
107+
assertEquals(60, timestampTopic2P1.timestamp)
108+
assertEquals(Optional.of(0), timestampTopic2P1.leaderEpoch)
109+
}
91110

92111
val timestampTopic3P0 = timestampOffsets.get(new TopicPartition(topic3, 0))
93112
assertEquals(80, timestampTopic3P0.offset)
@@ -98,8 +117,9 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
98117
}
99118

100119
@nowarn("cat=deprecation")
101-
@Test
102-
def testEarliestOrLatestOffsets(): Unit = {
120+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
121+
@ValueSource(strings = Array("zk", "kraft"))
122+
def testEarliestOrLatestOffsets(quorum: String): Unit = {
103123
val topic0 = "topicWithNewMessageFormat"
104124
val topic1 = "topicWithOldMessageFormat"
105125
val prop = new Properties()

0 commit comments

Comments
 (0)