Skip to content

Conversation

@hawk9821
Copy link
Contributor

Purpose of this pull request

Refactor Paimon's dynamic bucketing mechanism to address potential data duplication issues that may arise after modifying the parallelism level.

Does this PR introduce any user-facing change?

How was this patch tested?

PaimonSinkDynamicBucketIT#testWriteForDifferentParallelism

Check list

@nielifeng nielifeng requested a review from Copilot July 21, 2025 03:35
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors Paimon's dynamic bucketing mechanism to address potential data duplication issues that may arise after modifying the parallelism level. The fix implements a new channel computer and bucket assigner factory to ensure consistent data distribution across different parallelism configurations.

  • Introduces new bucket assignment logic with RowAssignerChannelComputer and PaimonBucketAssignerFactory
  • Adds comprehensive test cases for dynamic bucket functionality with different parallelism settings
  • Replaces the previous bucket assignment approach to prevent data duplication when parallelism changes

Reviewed Changes

Copilot reviewed 19 out of 21 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
PaimonSinkWriter.java Implements new dynamic bucket assignment logic using factory pattern and synchronized access
RowAssignerChannelComputer.java New channel computer for consistent row distribution across parallel tasks
PaimonBucketAssignerFactory.java Factory class to manage bucket assigners per table and task
PaimonBucketAssigner.java Refactored to use HashBucketAssigner instead of SimpleHashBucketAssigner
Test configuration files Multiple new test configurations for different parallelism scenarios
PaimonSinkDynamicBucketIT.java Enhanced test suite with MySQL integration and parallelism validation

// When multiple threads call assigner.assign() simultaneously, they can
// corrupt the internal hash map structure, leading to the
// ArrayIndexOutOfBoundsException during rehashing operations
synchronized (bucketAssigner) {
Copy link

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronizing on the bucketAssigner instance may cause performance bottlenecks. Consider using a more granular locking mechanism or concurrent data structures to reduce contention.

Copilot uses AI. Check for mistakes.
@hawk9821 hawk9821 force-pushed the paimon_dynamic_bucket branch 2 times, most recently from 80962c7 to 7b55968 Compare July 22, 2025 15:18
@hawk9821 hawk9821 force-pushed the paimon_dynamic_bucket branch from 7b55968 to 584d704 Compare July 24, 2025 14:36
@hawk9821 hawk9821 force-pushed the paimon_dynamic_bucket branch from b8e5eec to ef0d544 Compare July 25, 2025 13:37
hawk9821 and others added 2 commits July 29, 2025 08:59
…che/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerFactory.java

Co-authored-by: Jia Fan <[email protected]>
@corgy-w corgy-w merged commit d29a531 into apache:dev Jul 31, 2025
5 checks passed
@hawk9821 hawk9821 deleted the paimon_dynamic_bucket branch August 11, 2025 06:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants