-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Fix][Spark] Fix source parallelism not working with Spark engine #9319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| envOption.put(EnvCommonOptions.PARALLELISM.key(), String.valueOf(parallelism)); | ||
| Dataset<Row> dataset = | ||
| sparkRuntimeEnvironment | ||
| .getSparkSession() | ||
| .read() | ||
| .format(SeaTunnelSource.class.getSimpleName()) | ||
| .option(EnvCommonOptions.PARALLELISM.key(), parallelism) | ||
| .option( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @joexjx . But in seatunnel, we recommend configuring parallelism in env rather than source configuration. This helps improve configuration consistency across different engines. Maybe you can help update the documentation to make the description clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand. However, this code does contain a very simple and obvious error. envOption is a HashMap that already includes EnvCommonOptions.PARALLELISM.key(). The .option(EnvCommonOptions.PARALLELISM.key(), parallelism) call is being overwritten by .options(envOption), which is why the properly configured parallelism (determined earlier based on whether it comes from env or source) isn't taking effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed .option(EnvCommonOptions.PARALLELISM.key(), ...) and updated envOptions with EnvCommonOptions.PARALLELISM in advance. This ensures that the parallelism value (which was already determined earlier based on whether it should be taken from env or source) takes effect correctly.
|
Seem like the commit history mess. Please rebase on dev. |
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add unit test case.
6aaaba8 to
a27d20c
Compare
| @Order(1) | ||
| public class SingleSplitSourceParallelismTest { | ||
| @Test | ||
| public void testSourceParallelismIsOneButEnvParallelismIsNotOne() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current test case is a bit overly complex. We just need to ensure that the DataSet returned by the execute method of SourceExecuteProcessor has the expected parallelism.
SourceExecuteProcessor processor = new SourceExecuteProcessor();
... init
Assertions.equals(10, processor.execute(xxx).get(0).getDataset().rdd().getNumPartitions());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hisoka-X Thank you for the guidance, I have added a simpler unit test.
|
waiting test case passes. |
…ng with Spark Engine (apache#9302) This bug was caused by EnvCommonOptions overriding SourceCommonOptions when setting the parallelism in the sparkRuntimeEnvironment.
…sm Overriding Environment Parallelism in Spark Engine.
…lelism Config Works And Overrides Env Config


close: #9302
This bug was caused by EnvCommonOptions overriding SourceCommonOptions when setting the parallelism in the sparkRuntimeEnvironment.
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide