Skip to content

Commit fe56fc9

Browse files
authored
KAFKA-18269: Remove deprecated protocol APIs support (KIP-896, KIP-724) (#18218)
Included in this change: 1. Remove deprecated protocol api versions from json files. 3. Remove fields that are no longer used from json files (affects ListOffsets, OffsetCommit, DescribeConfigs). 4. Remove record down-conversion support from KafkaApis. 5. No longer return `Errors.UNSUPPORTED_COMPRESSION_TYPE` on the fetch path[1]. 6. Deprecate `TopicConfig. MESSAGE_DOWNCONVERSION_ENABLE_CONFIG` and made the relevant configs (`message.downconversion.enable` and `log.message.downcoversion.enable`) no-ops since down-conversion is no longer supported. It was an oversight not to deprecate this via KIP-724. 7. Fix `shouldRetainsBufferReference` to handle null request schemas for a given version. 8. Simplify producer logic since it only supports the v2 record format now. 9. Fix tests so they don't exercise protocol api versions that have been removed. 10. Add upgrade note. Testing: 1. System tests have a lot of failures, but those tests fail for trunk too and I didn't see any issues specific to this change - it's hard to be sure given the number of failing tests, but let's not block on that given the other testing that has been done (see below). 3. Java producers and consumers with version 0.9-0.10.1 don't have api versions support and hence they fail in an ungraceful manner: the broker disconnects and the clients reconnect until the relevant timeout is triggered. 4. Same thing seems to happen for the console producer 0.10.2 although it's unclear why since api versions should be supported. I will look into this separately, it's unlikely to be related to this PR. 5. Console consumer 0.10.2 fails with the expected error and a reasonable message[2]. 6. Console producer and consumer 0.11.0 works fine, newer versions should naturally also work fine. 7. kcat 1.5.0 (based on librdkafka 1.1.0) produce and consume fail with a reasonable message[3][4]. 8. kcat 1.6.0-1.7.0 (based on librdkafka 1.5.0 and 1.7.0 respectively) consume fails with a reasonable message[5]. 9. kcat 1.6.0-1.7.0 produce works fine. 10. kcat 1.7.1 (based on librdkafka 1.8.2) works fine for consumer and produce. 11. confluent-go-client (librdkafka based) 1.8.2 works fine for consumer and produce. 12. I will test more clients, but I don't think we need to block the PR on that. Note that this also completes part of KIP-724: produce v2 and lower as well as fetch v3 and lower are no longer supported. Future PRs will remove conditional code that is no longer needed (some of that has been done in KafkaApis, but only what was required due to the schema changes). We can probably do that in master only as it does not change behavior. Note that I did not touch `ignorable` fields even though some of them could have been changed. The reasoning is that this could result in incompatible changes for clients that use new protocol versions without setting such fields _if_ we don't manually validate their presence. I will file a JIRA ticket to look into this carefully for each case (i.e. if we do validate their presence for the appropriate versions, we can set them to ignorable=false in the json file). [1] We would return this error if a fetch < v10 was used and the compression topic config was set to zstd, but we would not do the same for the case where zstd was compressed at the producer level (the most common case). Since there is no efficient way to do the check for the common case, I made it consistent for both by having no checks. [2] ```org.apache.kafka.common.errors.UnsupportedVersionException: The broker is too new to support JOIN_GROUP version 1``` [3]```METADATA|rdkafka#producer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent``` [4]```METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent``` [5] `ERROR: Topic test-topic [0] error: Failed to query logical offset END: Local: Required feature not supported by broker` Reviewers: David Arthur <[email protected]>
1 parent 288d4de commit fe56fc9

115 files changed

Lines changed: 474 additions & 1994 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

clients/src/main/java/org/apache/kafka/clients/ApiVersions.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,8 @@
1616
*/
1717
package org.apache.kafka.clients;
1818

19-
import org.apache.kafka.common.protocol.ApiKeys;
20-
import org.apache.kafka.common.record.RecordBatch;
21-
import org.apache.kafka.common.requests.ProduceRequest;
22-
2319
import java.util.HashMap;
2420
import java.util.Map;
25-
import java.util.Optional;
2621

2722
/**
2823
* Maintains node api versions for access outside of NetworkClient (which is where the information is derived).
@@ -33,7 +28,6 @@
3328
public class ApiVersions {
3429

3530
private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>();
36-
private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
3731

3832
// The maximum finalized feature epoch of all the node api versions.
3933
private long maxFinalizedFeaturesEpoch = -1;
@@ -50,7 +44,6 @@ public static class FinalizedFeaturesInfo {
5044

5145
public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions) {
5246
this.nodeApiVersions.put(nodeId, nodeApiVersions);
53-
this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
5447
if (maxFinalizedFeaturesEpoch < nodeApiVersions.finalizedFeaturesEpoch()) {
5548
this.maxFinalizedFeaturesEpoch = nodeApiVersions.finalizedFeaturesEpoch();
5649
this.finalizedFeatures = nodeApiVersions.finalizedFeatures();
@@ -59,7 +52,6 @@ public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions)
5952

6053
public synchronized void remove(String nodeId) {
6154
this.nodeApiVersions.remove(nodeId);
62-
this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
6355
}
6456

6557
public synchronized NodeApiVersions get(String nodeId) {
@@ -74,19 +66,4 @@ public synchronized FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
7466
return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch, finalizedFeatures);
7567
}
7668

77-
private byte computeMaxUsableProduceMagic() {
78-
// use a magic version which is supported by all brokers to reduce the chance that
79-
// we will need to convert the messages when they are ready to be sent.
80-
Optional<Byte> knownBrokerNodesMinRequiredMagicForProduce = this.nodeApiVersions.values().stream()
81-
.filter(versions -> versions.apiVersion(ApiKeys.PRODUCE) != null) // filter out Raft controller nodes
82-
.map(versions -> ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE)))
83-
.min(Byte::compare);
84-
return (byte) Math.min(RecordBatch.CURRENT_MAGIC_VALUE,
85-
knownBrokerNodesMinRequiredMagicForProduce.orElse(RecordBatch.CURRENT_MAGIC_VALUE));
86-
}
87-
88-
public synchronized byte maxUsableProduceMagic() {
89-
return maxUsableProduceMagic;
90-
}
91-
9269
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -111,33 +111,15 @@ OffsetFetcherUtils.ListOffsetResult handleListOffsetResponse(ListOffsetsResponse
111111
Errors error = Errors.forCode(partition.errorCode());
112112
switch (error) {
113113
case NONE:
114-
if (!partition.oldStyleOffsets().isEmpty()) {
115-
// Handle v0 response with offsets
116-
long offset;
117-
if (partition.oldStyleOffsets().size() > 1) {
118-
throw new IllegalStateException("Unexpected partitionData response of length " +
119-
partition.oldStyleOffsets().size());
120-
} else {
121-
offset = partition.oldStyleOffsets().get(0);
122-
}
123-
log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
124-
topicPartition, offset);
125-
if (offset != ListOffsetsResponse.UNKNOWN_OFFSET) {
126-
OffsetFetcherUtils.ListOffsetData offsetData = new OffsetFetcherUtils.ListOffsetData(offset, null, Optional.empty());
127-
fetchedOffsets.put(topicPartition, offsetData);
128-
}
129-
} else {
130-
// Handle v1 and later response or v0 without offsets
131-
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
132-
topicPartition, partition.offset(), partition.timestamp());
133-
if (partition.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
134-
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
135-
? Optional.empty()
136-
: Optional.of(partition.leaderEpoch());
137-
OffsetFetcherUtils.ListOffsetData offsetData = new OffsetFetcherUtils.ListOffsetData(partition.offset(), partition.timestamp(),
138-
leaderEpoch);
139-
fetchedOffsets.put(topicPartition, offsetData);
140-
}
114+
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
115+
topicPartition, partition.offset(), partition.timestamp());
116+
if (partition.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
117+
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
118+
? Optional.empty()
119+
: Optional.of(partition.leaderEpoch());
120+
OffsetFetcherUtils.ListOffsetData offsetData = new OffsetFetcherUtils.ListOffsetData(partition.offset(), partition.timestamp(),
121+
leaderEpoch);
122+
fetchedOffsets.put(topicPartition, offsetData);
141123
}
142124
break;
143125
case UNSUPPORTED_FOR_MESSAGE_FORMAT:
@@ -458,4 +440,4 @@ static class ListOffsetData {
458440
this.leaderEpoch = leaderEpoch;
459441
}
460442
}
461-
}
443+
}

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
10041004
setReadOnly(record.headers());
10051005
Header[] headers = record.headers().toArray();
10061006

1007-
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
1007+
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(RecordBatch.CURRENT_MAGIC_VALUE,
10081008
compression.type(), serializedKey, serializedValue, headers);
10091009
ensureValidRecordSize(serializedSize);
10101010
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.common.PartitionInfo;
2828
import org.apache.kafka.common.TopicPartition;
2929
import org.apache.kafka.common.compress.Compression;
30-
import org.apache.kafka.common.errors.UnsupportedVersionException;
3130
import org.apache.kafka.common.header.Header;
3231
import org.apache.kafka.common.metrics.Metrics;
3332
import org.apache.kafka.common.record.AbstractRecords;
@@ -344,8 +343,8 @@ public RecordAppendResult append(String topic,
344343
}
345344

346345
if (buffer == null) {
347-
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
348-
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
346+
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
347+
RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers));
349348
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock);
350349
// This call may block if we exhausted buffer space.
351350
buffer = free.allocate(size, maxTimeToBlock);
@@ -408,7 +407,7 @@ private RecordAppendResult appendNewBatch(String topic,
408407
return appendResult;
409408
}
410409

411-
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic());
410+
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer);
412411
ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
413412
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
414413
callbacks, nowMs));
@@ -419,12 +418,8 @@ private RecordAppendResult appendNewBatch(String topic,
419418
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes());
420419
}
421420

422-
private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
423-
if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
424-
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
425-
"support the required message format (v2). The broker must be version 0.11 or later.");
426-
}
427-
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
421+
private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) {
422+
return MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME, 0L);
428423
}
429424

430425
/**

clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -871,27 +871,10 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
871871
return;
872872

873873
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
874-
875-
// find the minimum magic version used when creating the record sets
876-
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
877-
for (ProducerBatch batch : batches) {
878-
if (batch.magic() < minUsedMagic)
879-
minUsedMagic = batch.magic();
880-
}
881874
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
882875
for (ProducerBatch batch : batches) {
883876
TopicPartition tp = batch.topicPartition;
884877
MemoryRecords records = batch.records();
885-
886-
// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
887-
// that the producer starts building the batch and the time that we send the request, and we may have
888-
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
889-
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
890-
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
891-
// not all support the same message format version. For example, if a partition migrates from a broker
892-
// which is supporting the new magic version to one which doesn't, then we will need to convert.
893-
if (!records.hasMatchingMagic(minUsedMagic))
894-
records = batch.records().downConvert(minUsedMagic, 0, time).records();
895878
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
896879
if (tpData == null) {
897880
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
@@ -904,18 +887,13 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
904887
}
905888

906889
String transactionalId = null;
907-
908-
// When we use transaction V1 protocol in transaction we set the request version upper limit to
909-
// LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 so that the broker knows that we're using transaction protocol V1.
910890
boolean useTransactionV1Version = false;
911891
if (transactionManager != null && transactionManager.isTransactional()) {
912892
transactionalId = transactionManager.transactionalId();
913-
if (!transactionManager.isTransactionV2Enabled()) {
914-
useTransactionV1Version = true;
915-
}
893+
useTransactionV1Version = !transactionManager.isTransactionV2Enabled();
916894
}
917895

918-
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
896+
ProduceRequest.Builder requestBuilder = ProduceRequest.builder(
919897
new ProduceRequestData()
920898
.setAcks(acks)
921899
.setTimeoutMs(timeout)

clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,18 @@ public class TopicConfig {
234234
"or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " +
235235
"configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " +
236236
"timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
237+
238+
/**
239+
* @deprecated down-conversion is not possible in Apache Kafka 4.0 and newer, hence this configuration is a no-op,
240+
* and it is deprecated for removal in Apache Kafka 5.0.
241+
*/
242+
@Deprecated
237243
public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable";
238-
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This configuration controls whether " +
239-
"down-conversion of message formats is enabled to satisfy consume requests. When set to <code>false</code>, " +
240-
"broker will not perform down-conversion for consumers expecting an older message format. The broker responds " +
241-
"with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" +
242-
"does not apply to any message format conversion that might be required for replication to followers.";
244+
245+
/**
246+
* @deprecated see {@link #MESSAGE_DOWNCONVERSION_ENABLE_CONFIG}.
247+
*/
248+
@Deprecated
249+
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "Down-conversion is not possible in Apache Kafka 4.0 and newer, " +
250+
"hence this configuration is no-op and it is deprecated for removal in Apache Kafka 5.0.";
243251
}

clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public enum ApiKeys {
197197
private static boolean shouldRetainsBufferReference(Schema[] requestSchemas) {
198198
boolean requestRetainsBufferReference = false;
199199
for (Schema requestVersionSchema : requestSchemas) {
200-
if (retainsBufferReference(requestVersionSchema)) {
200+
if (requestVersionSchema != null && retainsBufferReference(requestVersionSchema)) {
201201
requestRetainsBufferReference = true;
202202
break;
203203
}

clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -224,29 +224,6 @@ public DescribeConfigsResponse(DescribeConfigsResponseData data) {
224224
this.data = data;
225225
}
226226

227-
// This constructor should only be used after deserialization, it has special handling for version 0
228-
private DescribeConfigsResponse(DescribeConfigsResponseData data, short version) {
229-
super(ApiKeys.DESCRIBE_CONFIGS);
230-
this.data = data;
231-
if (version == 0) {
232-
for (DescribeConfigsResponseData.DescribeConfigsResult result : data.results()) {
233-
for (DescribeConfigsResponseData.DescribeConfigsResourceResult config : result.configs()) {
234-
if (config.isDefault()) {
235-
config.setConfigSource(ConfigSource.DEFAULT_CONFIG.id);
236-
} else {
237-
if (result.resourceType() == ConfigResource.Type.BROKER.id()) {
238-
config.setConfigSource(ConfigSource.STATIC_BROKER_CONFIG.id);
239-
} else if (result.resourceType() == ConfigResource.Type.TOPIC.id()) {
240-
config.setConfigSource(ConfigSource.TOPIC_CONFIG.id);
241-
} else {
242-
config.setConfigSource(ConfigSource.UNKNOWN.id);
243-
}
244-
}
245-
}
246-
}
247-
}
248-
}
249-
250227
@Override
251228
public DescribeConfigsResponseData data() {
252229
return data;
@@ -272,7 +249,7 @@ public Map<Errors, Integer> errorCounts() {
272249
}
273250

274251
public static DescribeConfigsResponse parse(ByteBuffer buffer, short version) {
275-
return new DescribeConfigsResponse(new DescribeConfigsResponseData(new ByteBufferAccessor(buffer), version), version);
252+
return new DescribeConfigsResponse(new DescribeConfigsResponseData(new ByteBufferAccessor(buffer), version));
276253
}
277254

278255
@Override

clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
import java.nio.ByteBuffer;
3232
import java.util.ArrayList;
33-
import java.util.Collections;
3433
import java.util.HashMap;
3534
import java.util.HashSet;
3635
import java.util.List;
@@ -137,7 +136,6 @@ private ListOffsetsRequest(ListOffsetsRequestData data, short version) {
137136

138137
@Override
139138
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
140-
short versionId = version();
141139
short errorCode = Errors.forException(e).code();
142140

143141
List<ListOffsetsTopicResponse> responses = new ArrayList<>();
@@ -148,12 +146,8 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
148146
ListOffsetsPartitionResponse partitionResponse = new ListOffsetsPartitionResponse()
149147
.setErrorCode(errorCode)
150148
.setPartitionIndex(partition.partitionIndex());
151-
if (versionId == 0) {
152-
partitionResponse.setOldStyleOffsets(Collections.emptyList());
153-
} else {
154-
partitionResponse.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
155-
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP);
156-
}
149+
partitionResponse.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
150+
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP);
157151
partitions.add(partitionResponse);
158152
}
159153
topicResponse.setPartitions(partitions);

0 commit comments

Comments
 (0)