-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Fix][paimon-e2e] e2e test error #9467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes an e2e test error by updating configuration file syntax for paimon connectors and adjusting the test class for proper handling of multiple engine types.
- Updated the row_rules configuration in two files by removing extra array wrappers.
- Modified the PaimonIT test class to disable Spark and Flink engines and add startup initialization.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf | Simplifies the row_rules configuration by removing extra brackets. |
| seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf | Similar simplification applied to the row_rules configuration. |
| seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java | Adjusted test class to extend AbstractPaimonIT, implement TestResource, and disable engines with a clear reason. |
Comments suppressed due to low confidence (2)
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf:47
- Confirm that consolidating the row_rules definition by removing the explicit array brackets aligns with the expected configuration schema and supports multiple rule entries if needed.
},
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf:48
- Ensure that the revised configuration structure for row_rules without an explicit array wrapper is compatible with the configuration parser, especially in cases where multiple rules might be used.
},
| value = {}, | ||
| type = {EngineType.SPARK, EngineType.FLINK}, | ||
| disabledReason = | ||
| "Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error") |
Copilot
AI
Jun 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider rephrasing the disabledReason to clearly explain the limitation with auto-creating paimon tables on local worker nodes (e.g., detailing the save mode issue) for better clarity to future maintainers.
| "Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error") | |
| "Spark and Flink engines cannot automatically create Paimon tables on local worker nodes due to limitations with the save mode feature. The save mode requires the table to exist beforehand, which can cause errors when running jobs on local file systems (e.g., Flink TaskManager).") |
|
Hi Why do need to change this? This test seems to be able to run in flink spark |
|
The reader read data actually is empty,the assert is error |
|
The write data in task manager,but read split info in jobmanager |
| value = {}, | ||
| type = {EngineType.SPARK, EngineType.FLINK}, | ||
| disabledReason = | ||
| "Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems not necessary here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fundamental reason is the lack of distributed storage. Why is the retrieved data empty? Because the data was written to the local node. The data write operation happens on the TaskManager node, while the JobManager is responsible for reading the split information. However, the JobManager cannot access the metadata of the data since there is no distributed storage system in place.
Another issue is why the assertion passes even though no data was read. This is because the assert statement was implemented incorrectly, allowing it to pass even when no data was actually retrieved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be that there is a error with assert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make jobmanager and taskmanager mount the same path to solve this problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I attempted to resolve the issue by mounting the same file path, however, the necessary changes would need to be made in the AbstractTestFlinkContainer class — a shared base class. Modifying it could affect multiple tests and introduces too much risk. As an alternative, I looked into how other test classes handle similar cases, and decided that skipping the Spark and Flink engines is the most practical solution.
@Override
public void startUp() throws Exception {
final String dockerImage = getDockerImage();
final String properties = String.join("\n", getFlinkProperties());
jobManager =
new GenericContainer<>(dockerImage)
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withNetworkAliases("jobmanager")
.withExposedPorts()
.withEnv("FLINK_PROPERTIES", properties)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(dockerImage + ":jobmanager")))
.waitingFor(
new LogMessageWaitStrategy()
.withRegEx(".*Starting the resource manager.*")
.withStartupTimeout(Duration.ofMinutes(2)));
copySeaTunnelStarterToContainer(jobManager);
copySeaTunnelStarterLoggingToContainer(jobManager);
jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 8081, 8081)));
taskManager =
new GenericContainer<>(dockerImage)
.withCommand("taskmanager")
.withNetwork(NETWORK)
.withNetworkAliases("taskmanager")
.withEnv("FLINK_PROPERTIES", properties)
.dependsOn(jobManager)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(
dockerImage + ":taskmanager")))
.waitingFor(
new LogMessageWaitStrategy()
.withRegEx(
".*Successful registration at resource manager.*")
.withStartupTimeout(Duration.ofMinutes(2)));
Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
// execute extra commands
executeExtraCommands(jobManager);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be possible to add a shared path address that other programs will not use. For example, /opt/seatunnel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
cc @hawk9821 |
| primaryKey { | ||
| name = "pk_id" | ||
| columnNames = [pk_id] | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pk_id in the generated fake data is randomly created, which may lead to duplicate entries. Due to the primary key constraint, the actual amount of inserted data may be less than the target. The reason it passed the previous assertion is because the assertion was written incorrectly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense to me. cc @hawk9821
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
data generated by fakesource may indeed be repetitive. 0 - Integer.MAX_VALUE the probability of repetition is relatively low, so this case can run correctly. I think to ensure absolute certainty, we can enable fakesource to support the capability of auto-incrementing primary keys. cc @WenDing-Y @Hisoka-X
| value = {}, | ||
| type = {EngineType.SPARK, EngineType.FLINK}, | ||
| disabledReason = | ||
| "Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make jobmanager and taskmanager mount the same path to solve this problem?
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @WenDing-Y
Purpose of this pull request
fix #9466
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide