Skip to content

Conversation

@joexjx
Copy link
Contributor

@joexjx joexjx commented May 15, 2025

close: #9302

This bug was caused by EnvCommonOptions overriding SourceCommonOptions when setting the parallelism in the sparkRuntimeEnvironment.

Purpose of this pull request

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@github-actions github-actions bot added the core SeaTunnel core module label May 15, 2025
@joexjx joexjx changed the title [Bugfix] [Connector-V2] Fix SourceCommonOptions Parallelism Not Working with Spark Engine (#9302) [Bugfix] [Connector-V2] Fix SourceCommonOptions Parallelism Not Working with Spark Engine May 15, 2025
Comment on lines +95 to 101
envOption.put(EnvCommonOptions.PARALLELISM.key(), String.valueOf(parallelism));
Dataset<Row> dataset =
sparkRuntimeEnvironment
.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
.option(EnvCommonOptions.PARALLELISM.key(), parallelism)
.option(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @joexjx . But in seatunnel, we recommend configuring parallelism in env rather than source configuration. This helps improve configuration consistency across different engines. Maybe you can help update the documentation to make the description clearer.

Copy link
Contributor Author

@joexjx joexjx May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. However, this code does contain a very simple and obvious error. envOption is a HashMap that already includes EnvCommonOptions.PARALLELISM.key(). The .option(EnvCommonOptions.PARALLELISM.key(), parallelism) call is being overwritten by .options(envOption), which is why the properly configured parallelism (determined earlier based on whether it comes from env or source) isn't taking effect.

Old Code
image

New Code
image

Copy link
Contributor Author

@joexjx joexjx May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed .option(EnvCommonOptions.PARALLELISM.key(), ...) and updated envOptions with EnvCommonOptions.PARALLELISM in advance. This ensures that the parallelism value (which was already determined earlier based on whether it should be taken from env or source) takes effect correctly.

@Hisoka-X
Copy link
Member

Seem like the commit history mess. Please rebase on dev.

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit test case.

@Order(1)
public class SingleSplitSourceParallelismTest {
@Test
public void testSourceParallelismIsOneButEnvParallelismIsNotOne()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current test case is a bit overly complex. We just need to ensure that the DataSet returned by the execute method of SourceExecuteProcessor has the expected parallelism.

SourceExecuteProcessor processor = new SourceExecuteProcessor();
... init

Assertions.equals(10, processor.execute(xxx).get(0).getDataset().rdd().getNumPartitions());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hisoka-X Thank you for the guidance, I have added a simpler unit test.

@Hisoka-X
Copy link
Member

waiting test case passes.

joexjx added 3 commits May 28, 2025 09:48
…ng with Spark Engine (apache#9302)

This bug was caused by EnvCommonOptions overriding SourceCommonOptions when setting the parallelism in the sparkRuntimeEnvironment.
…sm Overriding Environment Parallelism in Spark Engine.
…lelism Config Works And Overrides Env Config
@Hisoka-X Hisoka-X changed the title [Bugfix] [Connector-V2] Fix SourceCommonOptions Parallelism Not Working with Spark Engine [Fix][Spark] Fix source parallelism not working with Spark engine May 28, 2025
@hailin0 hailin0 merged commit bafd8e2 into apache:dev May 28, 2025
5 checks passed
dybyte pushed a commit to dybyte/seatunnel that referenced this pull request Jul 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved core SeaTunnel core module reviewed

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] [Connector-v2] Bug title The Source Common Options parallelism not work

3 participants