1717package kafka .api
1818
1919import kafka .server .KafkaConfig
20+ import kafka .utils .TestInfoUtils
2021import org .apache .kafka .clients .producer .ProducerConfig
2122import org .apache .kafka .common .TopicPartition
2223import org .apache .kafka .common .config .TopicConfig
2324import org .junit .jupiter .api .Assertions .{assertEquals , assertNull , assertThrows }
24- import org .junit .jupiter .api .Test
25+ import org .junit .jupiter .params .ParameterizedTest
26+ import org .junit .jupiter .params .provider .ValueSource
2527
2628import java .util
2729import java .util .{Collections , Optional , Properties }
@@ -32,12 +34,15 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
3234
3335 override protected def brokerPropertyOverrides (properties : Properties ): Unit = {
3436 // legacy message formats are only supported with IBP < 3.0
35- properties.put(KafkaConfig .InterBrokerProtocolVersionProp , " 2.8" )
37+ // KRaft mode is not supported for inter.broker.protocol.version = 2.8, The minimum version required is 3.0-IV1"
38+ if (! isKRaftTest())
39+ properties.put(KafkaConfig .InterBrokerProtocolVersionProp , " 2.8" )
3640 }
3741
3842 @ nowarn(" cat=deprecation" )
39- @ Test
40- def testOffsetsForTimes (): Unit = {
43+ @ ParameterizedTest (name = TestInfoUtils .TestWithParameterizedQuorumName )
44+ @ ValueSource (strings = Array (" zk" , " kraft" ))
45+ def testOffsetsForTimes (quorum : String ): Unit = {
4146 val numParts = 2
4247 val topic1 = " part-test-topic-1"
4348 val topic2 = " part-test-topic-2"
@@ -86,8 +91,22 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
8691 assertEquals(20 , timestampTopic1P1.timestamp)
8792 assertEquals(Optional .of(0 ), timestampTopic1P1.leaderEpoch)
8893
89- assertNull(timestampOffsets.get(new TopicPartition (topic2, 0 )), " null should be returned when message format is 0.9.0" )
90- assertNull(timestampOffsets.get(new TopicPartition (topic2, 1 )), " null should be returned when message format is 0.9.0" )
94+ if (! isKRaftTest()) {
95+ assertNull(timestampOffsets.get(new TopicPartition (topic2, 0 )), " null should be returned when message format is 0.9.0" )
96+ assertNull(timestampOffsets.get(new TopicPartition (topic2, 1 )), " null should be returned when message format is 0.9.0" )
97+ }
98+ else {
99+ // legacy message formats are supported for IBP version < 3.0 and KRaft runs on minimum version 3.0-IV1
100+ val timestampTopic2P0 = timestampOffsets.get(new TopicPartition (topic2, 0 ))
101+ assertEquals(40 , timestampTopic2P0.offset)
102+ assertEquals(40 , timestampTopic2P0.timestamp)
103+ assertEquals(Optional .of(0 ), timestampTopic2P0.leaderEpoch)
104+
105+ val timestampTopic2P1 = timestampOffsets.get(new TopicPartition (topic2, 1 ))
106+ assertEquals(60 , timestampTopic2P1.offset)
107+ assertEquals(60 , timestampTopic2P1.timestamp)
108+ assertEquals(Optional .of(0 ), timestampTopic2P1.leaderEpoch)
109+ }
91110
92111 val timestampTopic3P0 = timestampOffsets.get(new TopicPartition (topic3, 0 ))
93112 assertEquals(80 , timestampTopic3P0.offset)
@@ -98,8 +117,9 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
98117 }
99118
100119 @ nowarn(" cat=deprecation" )
101- @ Test
102- def testEarliestOrLatestOffsets (): Unit = {
120+ @ ParameterizedTest (name = TestInfoUtils .TestWithParameterizedQuorumName )
121+ @ ValueSource (strings = Array (" zk" , " kraft" ))
122+ def testEarliestOrLatestOffsets (quorum : String ): Unit = {
103123 val topic0 = " topicWithNewMessageFormat"
104124 val topic1 = " topicWithOldMessageFormat"
105125 val prop = new Properties ()
0 commit comments