-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Fix][Connectors-v2] fix dynamic bucket for paimon sink #9595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...r-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors Paimon's dynamic bucketing mechanism to address potential data duplication issues that may arise after modifying the parallelism level. The fix implements a new channel computer and bucket assigner factory to ensure consistent data distribution across different parallelism configurations.
- Introduces new bucket assignment logic with
RowAssignerChannelComputerandPaimonBucketAssignerFactory - Adds comprehensive test cases for dynamic bucket functionality with different parallelism settings
- Replaces the previous bucket assignment approach to prevent data duplication when parallelism changes
Reviewed Changes
Copilot reviewed 19 out of 21 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| PaimonSinkWriter.java | Implements new dynamic bucket assignment logic using factory pattern and synchronized access |
| RowAssignerChannelComputer.java | New channel computer for consistent row distribution across parallel tasks |
| PaimonBucketAssignerFactory.java | Factory class to manage bucket assigners per table and task |
| PaimonBucketAssigner.java | Refactored to use HashBucketAssigner instead of SimpleHashBucketAssigner |
| Test configuration files | Multiple new test configurations for different parallelism scenarios |
| PaimonSinkDynamicBucketIT.java | Enhanced test suite with MySQL integration and parallelism validation |
...on/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
Outdated
Show resolved
Hide resolved
...n-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
Outdated
Show resolved
Hide resolved
| // When multiple threads call assigner.assign() simultaneously, they can | ||
| // corrupt the internal hash map structure, leading to the | ||
| // ArrayIndexOutOfBoundsException during rehashing operations | ||
| synchronized (bucketAssigner) { |
Copilot
AI
Jul 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synchronizing on the bucketAssigner instance may cause performance bottlenecks. Consider using a more granular locking mechanism or concurrent data structures to reduce contention.
...rg/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerFactory.java
Outdated
Show resolved
Hide resolved
80962c7 to
7b55968
Compare
7b55968 to
584d704
Compare
b8e5eec to
ef0d544
Compare
...rg/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerFactory.java
Outdated
Show resolved
Hide resolved
…che/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerFactory.java Co-authored-by: Jia Fan <[email protected]>
Purpose of this pull request
Refactor Paimon's dynamic bucketing mechanism to address potential data duplication issues that may arise after modifying the parallelism level.
Does this PR introduce any user-facing change?
How was this patch tested?
PaimonSinkDynamicBucketIT#testWriteForDifferentParallelism
Check list
New License Guide