Skip to content

Conversation

@xiaochen-zhou
Copy link
Contributor

Purpose of this pull request

There may be an issue in the Kafka cluster where the disks containing a partition go offline simultaneously, resulting in the partition's leader being -1. When leader=-1, Kafka APIs like KafkaConsumer.position() will block. Therefore, we should filter out partitions with leader=-1.

image

Does this PR introduce any user-facing change?

no

How was this patch tested?

exist tests

Check list

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a blocking issue in Kafka connector's stream mode by filtering out partitions with unavailable leaders. When Kafka cluster partitions have leader=-1 (due to offline disks), KafkaConsumer APIs like position() will block, causing the connector to hang.

  • Adds filtering logic to exclude partitions without leaders in streaming mode
  • Prevents blocking behavior when Kafka partitions become unavailable
  • Includes warning logs for filtered partitions to aid debugging

log.info("Discovered topics: {}", topics);
Collection<TopicPartition> partitions =
adminClient.describeTopics(topics).all().get().values().stream()
adminClient.describeTopics(topics).allTopicNames().get().values().stream()
Copy link

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method call allTopicNames() appears incorrect. Based on the context and typical Kafka AdminClient API, this should likely be all() to get all topic descriptions.

Suggested change
adminClient.describeTopics(topics).allTopicNames().get().values().stream()
adminClient.describeTopics(topics).all().get().values().stream()

Copilot uses AI. Check for mistakes.
Comment on lines 344 to 346
== null) {
log.warn(
"Partition {} of topic {} has no leader.",
Copy link

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null check logic is incorrect. The condition checks partitionInfo.leader() == null but should check if the leader ID is -1, as mentioned in the PR description. Leader being null and leader being -1 are different conditions.

Suggested change
== null) {
log.warn(
"Partition {} of topic {} has no leader.",
.id() == -1) {
log.warn(
"Partition {} of topic {} has no valid leader (leader ID is -1).",

Copilot uses AI. Check for mistakes.
@xiaochen-zhou
Copy link
Contributor Author

Friendly ping, do you have time to take a look @Hisoka-X @Carl-Zhou-CN

@Carl-Zhou-CN
Copy link
Member

#8314 Continue the previous PR

@Carl-Zhou-CN
Copy link
Member

#8314 Continue the previous PR

At that time, we thought a switch needed to be added to this

@xiaochen-zhou
Copy link
Contributor Author

#8314 Continue the previous PR

At that time, we thought a switch needed to be added to this

OK.

@xiaochen-zhou
Copy link
Contributor Author

#8314 Continue the previous PR

At that time, we thought a switch needed to be added to this

done.

| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". |
| start_mode.end_timestamp | Long | No | - | The end time required for consumption mode to be "timestamp" in batch mode
| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. |
| ignore_no_leader_partition | Boolean | No | false | Whether to ignore partitions that have no leader. If set to true, partitions without a leader will be skipped during partition discovery. If set to false (default), the connector will include all partitions regardless of leader status. This is useful when dealing with Kafka clusters that may have temporary leadership issues. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we set ignore_no_leader_partition to true, if the leader is subsequently restored, can we still read the corresponding data normally without any intervention?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we set ignore_no_leader_partition to true, if the leader is subsequently restored, can we still read the corresponding data normally without any intervention?

We need to configure the partition-discovery.interval-millis parameter to allow automatic recovery of read and write operations once the partition is restored.

When ignore_no_leader_partition is set to true, newly discovered partitions with leader = -1 (unavailable partitions) will be filtered out to avoid task failures. After the partitions are restored, the dynamic partition discovery mechanism will automatically detect and include these restored partitions, making them available for use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to configure the partition-discovery.interval-millis parameter to allow automatic recovery of read and write operations once the partition is restored.

Could you add a verify to make sure the value of partition-discovery.interval-millis always be configured when ignore_no_leader_partition is true?

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @xiaochen-zhou for update!

}

@Test
void testIgnoreNoLeaderPartition() throws ExecutionException, InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case need check for After the partitions are restored, the dynamic partition discovery mechanism will automatically detect and include these restored partitions, making them available for use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case need check for After the partitions are restored, the dynamic partition discovery mechanism will automatically detect and include these restored partitions, making them available for use.

Done.

@Hisoka-X
Copy link
Member

Hisoka-X commented Aug 4, 2025

waiting test case passes.

@xiaochen-zhou
Copy link
Contributor Author

waiting test case passes.

Done. @Hisoka-X @Carl-Zhou-CN

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@Carl-Zhou-CN Carl-Zhou-CN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@Carl-Zhou-CN Carl-Zhou-CN merged commit bd24fa7 into apache:dev Aug 4, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants