Skip to content

KAFKA-16726: Added share.auto.offset.reset dynamic config for share groups#17573

Merged
omkreddy merged 18 commits intoapache:trunkfrom
chirag-wadhwa5:KAFKA-16726
Nov 11, 2024
Merged

KAFKA-16726: Added share.auto.offset.reset dynamic config for share groups#17573
omkreddy merged 18 commits intoapache:trunkfrom
chirag-wadhwa5:KAFKA-16726

Conversation

@chirag-wadhwa5
Copy link
Copy Markdown
Collaborator

This PR adds another dynamic config share.auto.offset.reset fir share groups.

Reference: KAFKA-16726

@github-actions github-actions Bot added core Kafka Broker KIP-932 Queues for Kafka labels Oct 22, 2024
@chirag-wadhwa5 chirag-wadhwa5 marked this pull request as ready for review October 23, 2024 18:36
Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, left some comments.

Comment thread core/src/main/java/kafka/server/share/ShareFetchUtils.java
Comment thread core/src/main/java/kafka/server/share/ShareFetchUtils.java
Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/test/java/kafka/test/api/ShareConsumerTest.java Outdated
Comment thread core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala Outdated
return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset);
}

public enum ShareGroupAutoOffsetReset {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to use existing OffsetResetStrategy?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep them separate so we can evolve them separately.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. It's better to use OffsetResetStrategy, so I'll remove the newly added enum.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use OffsetResetStrategy, so I'll remove the newly added enum.

Did you miss it or changed mind?

What's the final decision, are we keeping it? I would prefer to use OffsetResetStrategy as don't see any difference to one defined as new. If we need to have an independent one, in future, because of some more states then create a new one. Else it's unnecessary.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect that OffsetResetStrategy will evolve independently and I prefer to separate them now.

Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a word with @AndrewJSchofield. Sure, let's have separate one.

@chirag-wadhwa5 When we define just earliest and latest in this enum then do we need the explicit validations we have specified for this config?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we would require any explicit validations on top of it. I have removed the unnecessary validations now.

Comment thread share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java Outdated
Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset);
}

public enum ShareGroupAutoOffsetReset {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep them separate so we can evolve them separately.

Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments to address.

Comment thread core/src/main/java/kafka/server/share/ShareFetchUtils.java
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED),
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to return 0 from this method when timestampAndOffset is empty?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. We could instead throw an error saying the latest offset not found, or maybe maybe initialize to the earliest offset, in case the latest is not found. What would you suggest here @apoorvmittal10 ?

Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we send 0 when timestamp is empty but if first fetch offset in log is at 10 then will the fetch throw an exception? If yes, then what will happen next? And in waht cases the the timestamp can be empty from replica manager?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is the possibility of an exception in the slightly unusual situation of a new leader which is lagging behind the current epoch. While I'm sure we're not going to bump into that right now, it is a real situation in the replica manager I think and the interface here needs to be flexible enough for an exception and we need to have a plan for what to do if the exception occurs. @mumrah can you help here please? I guess we'd want to do some kind of retry, which is perhaps best achieved by aborting the share-partition initialisation and letting the next attempt to initialize the share-partition have another try. The edges of the replication behaviour are a bit challenging I think.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @apoorvmittal10 , when replicaManager.fetchOffsetForTimestamp is called with LATEST_TIMESTAMP, as per the current code, only 2 things are possible, either it will throw an error or it will return a populated response (not a None object, which is possible when the same method is called with EARLIEST_TIMESTAMP). Now the error handling is there for when an error is thrown, but apart from that I don't suppose we need any other checks for whether the offset value is populated or not. But, if we want to have this check in place, I don't think we can assume any other value in place of this. The only right way would be to fail the share partition initialization. Another point to note is when offsetForEarliestTimestamp is called with EARLIEST_TIMESTAMP, the current code would return 0 if the returned value from replicaManager.fetchOffsetForTimestamp is None, which again would be a wrong assumption in case the start offset of the log is beyond 0. I believe the correct way is to fail the share partition initialization here as well.

Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/test/java/kafka/server/share/SharePartitionManagerTest.java Outdated
Comment thread core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala Outdated
Copy link
Copy Markdown
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a partial review. Left some comments.

Comment thread core/src/test/java/kafka/server/share/SharePartitionTest.java
Comment thread core/src/test/java/kafka/server/share/SharePartitionTest.java Outdated
Comment thread core/src/test/java/kafka/test/api/ShareConsumerTest.java Outdated
Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes. Some comments for clarification and changes. Mostly LGTM!

Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/test/java/kafka/server/share/SharePartitionManagerTest.java Outdated
Comment thread core/src/test/java/kafka/server/share/SharePartitionTest.java
Comment thread core/src/test/java/kafka/test/api/ShareConsumerTest.java Outdated
Comment thread core/src/test/java/kafka/test/api/ShareConsumerTest.java Outdated
return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset);
}

public enum ShareGroupAutoOffsetReset {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use OffsetResetStrategy, so I'll remove the newly added enum.

Did you miss it or changed mind?

What's the final decision, are we keeping it? I would prefer to use OffsetResetStrategy as don't see any difference to one defined as new. If we need to have an independent one, in future, because of some more states then create a new one. Else it's unnecessary.

@AndrewJSchofield AndrewJSchofield requested review from mumrah and removed request for adixitconfluent October 28, 2024 12:00
@@ -128,4 +130,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I think this is more complicated that it seems. If KIP-405 is enabled, then returning EARLIEST_TIMESTAMP is asynchronous. It may be necessary to wait before there's an answer to the earliest offset. If we put that concern aside for this PR, we do need an issue to track this.

Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED),
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is the possibility of an exception in the slightly unusual situation of a new leader which is lagging behind the current epoch. While I'm sure we're not going to bump into that right now, it is a real situation in the replica manager I think and the interface here needs to be flexible enough for an exception and we need to have a plan for what to do if the exception occurs. @mumrah can you help here please? I guess we'd want to do some kind of retry, which is perhaps best achieved by aborting the share-partition initialisation and letting the next attempt to initialize the share-partition have another try. The edges of the replication behaviour are a bit challenging I think.

Comment thread core/src/test/scala/unit/kafka/server/KafkaApisTest.scala Outdated
return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset);
}

public enum ShareGroupAutoOffsetReset {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect that OffsetResetStrategy will evolve independently and I prefer to separate them now.

Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just I am not understanding the change with re-ordered tests and an open query on returning 0 with offsetForTimestamp method.

Comment on lines +145 to +157
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)

// Send the share fetch request to fetch the records produced above
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
// Send the second share fetch request to fetch the records produced above
metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
acknowledgementsMap = Map.empty
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cant we define the offset reset startegy while creating KafkaShareConsumer?

@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) {
KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about shareConsumerEarliest and shareConsumerLatest? Much more descriptive than 1 or 2.

}

public enum ShareGroupAutoOffsetReset {
LATEST, EARLIEST, NONE;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't really have the value "NONE" does it? The KIP mentions only LATEST and EARLIEST I think.

Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, agree with Andrew's comments.

Copy link
Copy Markdown
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. LGTM.

val send: Seq[TopicIdPartition] = Seq(topicIdPartition)

// Send the first share fetch request to initialize the share partition
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like some of these lines are repeating, is it possible to extract a function and reuse the same.

Copy link
Copy Markdown
Contributor

@omkreddy omkreddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chirag-wadhwa5 Thanks for the PR. LGTM

@omkreddy omkreddy merged commit 9db5ed0 into apache:trunk Nov 11, 2024
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Nov 30, 2024
…roups (apache#17573)

This PR adds another dynamic config share.auto.offset.reset fir share groups.


Reviewers:  Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>,  Abhinav Dixit <[email protected]>, Manikumar Reddy <[email protected]>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
…roups (apache#17573)

This PR adds another dynamic config share.auto.offset.reset fir share groups.


Reviewers:  Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>,  Abhinav Dixit <[email protected]>, Manikumar Reddy <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants