-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Fix][Connector-V2] Add Filter for Partitions to Prevent Blocking in KafkaConsumer StreamMode #9598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fix][Connector-V2] Add Filter for Partitions to Prevent Blocking in KafkaConsumer StreamMode #9598
Conversation
…KafkaConsumer StreamMode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a blocking issue in Kafka connector's stream mode by filtering out partitions with unavailable leaders. When Kafka cluster partitions have leader=-1 (due to offline disks), KafkaConsumer APIs like position() will block, causing the connector to hang.
- Adds filtering logic to exclude partitions without leaders in streaming mode
- Prevents blocking behavior when Kafka partitions become unavailable
- Includes warning logs for filtered partitions to aid debugging
| log.info("Discovered topics: {}", topics); | ||
| Collection<TopicPartition> partitions = | ||
| adminClient.describeTopics(topics).all().get().values().stream() | ||
| adminClient.describeTopics(topics).allTopicNames().get().values().stream() |
Copilot
AI
Jul 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method call allTopicNames() appears incorrect. Based on the context and typical Kafka AdminClient API, this should likely be all() to get all topic descriptions.
| adminClient.describeTopics(topics).allTopicNames().get().values().stream() | |
| adminClient.describeTopics(topics).all().get().values().stream() |
| == null) { | ||
| log.warn( | ||
| "Partition {} of topic {} has no leader.", |
Copilot
AI
Jul 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The null check logic is incorrect. The condition checks partitionInfo.leader() == null but should check if the leader ID is -1, as mentioned in the PR description. Leader being null and leader being -1 are different conditions.
| == null) { | |
| log.warn( | |
| "Partition {} of topic {} has no leader.", | |
| .id() == -1) { | |
| log.warn( | |
| "Partition {} of topic {} has no valid leader (leader ID is -1).", |
|
Friendly ping, do you have time to take a look @Hisoka-X @Carl-Zhou-CN |
|
#8314 Continue the previous PR |
At that time, we thought a switch needed to be added to this |
OK. |
done. |
| | start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". | | ||
| | start_mode.end_timestamp | Long | No | - | The end time required for consumption mode to be "timestamp" in batch mode | ||
| | partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. | | ||
| | ignore_no_leader_partition | Boolean | No | false | Whether to ignore partitions that have no leader. If set to true, partitions without a leader will be skipped during partition discovery. If set to false (default), the connector will include all partitions regardless of leader status. This is useful when dealing with Kafka clusters that may have temporary leadership issues. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we set ignore_no_leader_partition to true, if the leader is subsequently restored, can we still read the corresponding data normally without any intervention?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we set
ignore_no_leader_partitionto true, if the leader is subsequently restored, can we still read the corresponding data normally without any intervention?
We need to configure the partition-discovery.interval-millis parameter to allow automatic recovery of read and write operations once the partition is restored.
When ignore_no_leader_partition is set to true, newly discovered partitions with leader = -1 (unavailable partitions) will be filtered out to avoid task failures. After the partitions are restored, the dynamic partition discovery mechanism will automatically detect and include these restored partitions, making them available for use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to configure the partition-discovery.interval-millis parameter to allow automatic recovery of read and write operations once the partition is restored.
Could you add a verify to make sure the value of partition-discovery.interval-millis always be configured when ignore_no_leader_partition is true?
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @xiaochen-zhou for update!
| } | ||
|
|
||
| @Test | ||
| void testIgnoreNoLeaderPartition() throws ExecutionException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test case need check for After the partitions are restored, the dynamic partition discovery mechanism will automatically detect and include these restored partitions, making them available for use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test case need check for
After the partitions are restored, the dynamic partition discovery mechanism will automatically detect and include these restored partitions, making them available for use.
Done.
|
waiting test case passes. |
Done. @Hisoka-X @Carl-Zhou-CN |
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xiaochen-zhou
Carl-Zhou-CN
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Purpose of this pull request
There may be an issue in the Kafka cluster where the disks containing a partition go offline simultaneously, resulting in the partition's leader being -1. When leader=-1, Kafka APIs like KafkaConsumer.position() will block. Therefore, we should filter out partitions with leader=-1.
Does this PR introduce any user-facing change?
no
How was this patch tested?
exist tests
Check list
New License Guide