Skip to content

Conversation

@WenDing-Y
Copy link
Contributor

Purpose of this pull request

fix #9466

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@github-actions github-actions bot added the e2e label Jun 18, 2025
@WenDing-Y WenDing-Y changed the title Fix paimon e2e test error [Fix][paimon-e2e] e2e test error Jun 18, 2025
@nielifeng nielifeng requested a review from Copilot June 19, 2025 03:01
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes an e2e test error by updating configuration file syntax for paimon connectors and adjusting the test class for proper handling of multiple engine types.

  • Updated the row_rules configuration in two files by removing extra array wrappers.
  • Modified the PaimonIT test class to disable Spark and Flink engines and add startup initialization.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf Simplifies the row_rules configuration by removing extra brackets.
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf Similar simplification applied to the row_rules configuration.
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java Adjusted test class to extend AbstractPaimonIT, implement TestResource, and disable engines with a clear reason.
Comments suppressed due to low confidence (2)

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf:47

  • Confirm that consolidating the row_rules definition by removing the explicit array brackets aligns with the expected configuration schema and supports multiple rule entries if needed.
        },

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf:48

  • Ensure that the revised configuration structure for row_rules without an explicit array wrapper is compatible with the configuration parser, especially in cases where multiple rules might be used.
        },

value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason =
"Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error")
Copy link

Copilot AI Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider rephrasing the disabledReason to clearly explain the limitation with auto-creating paimon tables on local worker nodes (e.g., detailing the save mode issue) for better clarity to future maintainers.

Suggested change
"Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error")
"Spark and Flink engines cannot automatically create Paimon tables on local worker nodes due to limitations with the save mode feature. The save mode requires the table to exist beforehand, which can cause errors when running jobs on local file systems (e.g., Flink TaskManager).")

Copilot uses AI. Check for mistakes.
@corgy-w
Copy link
Contributor

corgy-w commented Jun 19, 2025

Hi Why do need to change this? This test seems to be able to run in flink spark

@WenDing-Y
Copy link
Contributor Author

The reader read data actually is empty,the assert is error

@WenDing-Y
Copy link
Contributor Author

The write data in task manager,but read split info in jobmanager

Comment on lines 38 to 41
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason =
"Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not necessary here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fundamental reason is the lack of distributed storage. Why is the retrieved data empty? Because the data was written to the local node. The data write operation happens on the TaskManager node, while the JobManager is responsible for reading the split information. However, the JobManager cannot access the metadata of the data since there is no distributed storage system in place.

Another issue is why the assertion passes even though no data was read. This is because the assert statement was implemented incorrectly, allowing it to pass even when no data was actually retrieved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be that there is a error with assert

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make jobmanager and taskmanager mount the same path to solve this problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to resolve the issue by mounting the same file path, however, the necessary changes would need to be made in the AbstractTestFlinkContainer class — a shared base class. Modifying it could affect multiple tests and introduces too much risk. As an alternative, I looked into how other test classes handle similar cases, and decided that skipping the Spark and Flink engines is the most practical solution.

 @Override
    public void startUp() throws Exception {
        final String dockerImage = getDockerImage();
        final String properties = String.join("\n", getFlinkProperties());
        jobManager =
                new GenericContainer<>(dockerImage)
                        .withCommand("jobmanager")
                        .withNetwork(NETWORK)
                        .withNetworkAliases("jobmanager")
                        .withExposedPorts()
                        .withEnv("FLINK_PROPERTIES", properties)
                        .withLogConsumer(
                                new Slf4jLogConsumer(
                                        DockerLoggerFactory.getLogger(dockerImage + ":jobmanager")))
                        .waitingFor(
                                new LogMessageWaitStrategy()
                                        .withRegEx(".*Starting the resource manager.*")
                                        .withStartupTimeout(Duration.ofMinutes(2)));
        copySeaTunnelStarterToContainer(jobManager);
        copySeaTunnelStarterLoggingToContainer(jobManager);

        jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 8081, 8081)));

        taskManager =
                new GenericContainer<>(dockerImage)
                        .withCommand("taskmanager")
                        .withNetwork(NETWORK)
                        .withNetworkAliases("taskmanager")
                        .withEnv("FLINK_PROPERTIES", properties)
                        .dependsOn(jobManager)
                        .withLogConsumer(
                                new Slf4jLogConsumer(
                                        DockerLoggerFactory.getLogger(
                                                dockerImage + ":taskmanager")))
                        .waitingFor(
                                new LogMessageWaitStrategy()
                                        .withRegEx(
                                                ".*Successful registration at resource manager.*")
                                        .withStartupTimeout(Duration.ofMinutes(2)));

        Startables.deepStart(Stream.of(jobManager)).join();
        Startables.deepStart(Stream.of(taskManager)).join();
        // execute extra commands
        executeExtraCommands(jobManager);
    }


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to add a shared path address that other programs will not use. For example, /opt/seatunnel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@WenDing-Y WenDing-Y requested a review from corgy-w June 23, 2025 03:32
@Hisoka-X
Copy link
Member

cc @hawk9821

Comment on lines -45 to -48
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pk_id in the generated fake data is randomly created, which may lead to duplicate entries. Due to the primary key constraint, the actual amount of inserted data may be less than the target. The reason it passed the previous assertion is because the assertion was written incorrectly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense to me. cc @hawk9821

Copy link
Contributor

@hawk9821 hawk9821 Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data generated by fakesource may indeed be repetitive. 0 - Integer.MAX_VALUE the probability of repetition is relatively low, so this case can run correctly. I think to ensure absolute certainty, we can enable fakesource to support the capability of auto-incrementing primary keys. cc @WenDing-Y @Hisoka-X

Comment on lines 38 to 41
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason =
"Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make jobmanager and taskmanager mount the same path to solve this problem?

@WenDing-Y WenDing-Y requested a review from Hisoka-X June 27, 2025 14:34
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @WenDing-Y

@corgy-w corgy-w merged commit 5b700a8 into apache:dev Jun 28, 2025
4 checks passed
@WenDing-Y WenDing-Y deleted the fix-paimon-e2e branch July 1, 2025 13:49
dybyte pushed a commit to dybyte/seatunnel that referenced this pull request Jul 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] [connector-e2e] test class have error

4 participants