Skip to content

Conversation

@hawk9821
Copy link
Contributor

Purpose of this pull request

source : mysql

CREATE TABLE `product` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `name` varchar(100) DEFAULT NULL COMMENT '商品名称',
  `price` decimal(4,1) DEFAULT NULL COMMENT '单价',
  PRIMARY KEY (`id`)
)  COMMENT='商品表' ;

insert into product values (1,'product1', 101.1);
insert into product values (2,'product2', 999.9);

sink : paimon flink

CREATE CATALOG paimon WITH (
  'type' = 'paimon',
  'warehouse' = '/tmp/paimon'
);

use catalog `paimon`;

create database if not exists `test`;

CREATE TABLE  `test`.`product` (
  `id` int NOT NULL COMMENT '主键',
  `name` varchar(100)  COMMENT '商品名称',
  `price` decimal(4,2)  COMMENT '单价',
  CONSTRAINT `PK_id` PRIMARY KEY (`id`) NOT ENFORCED
)

conf

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/source?connectTimeout=5000"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "*****"
    password = "******"
    table_path = "source.product"
    split.size = 8096
    plugin_output = "table_info"
  }
}

sink {
  Paimon {
    plugin_input = "table_info"
    schema_save_mode = "RECREATE_SCHEMA"
    catalog_name = "seatunnel_test"
    warehouse = "/tmp/paimon"
    database = "test"
    table = "product"
    generate_sink_sql = true
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    data_save_mode = "APPEND_DATA"
  }
}

Exception before the fix : the exception information is unclear, which is not convenient to locate the problem

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:309)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:203)
	... 17 more
Caused by: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java:358)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:289)
	... 18 more
Caused by: java.lang.NullPointerException
	at org.apache.paimon.data.AbstractBinaryWriter.writeDecimal(AbstractBinaryWriter.java:128)
	at org.apache.paimon.data.BinaryRowWriter.writeDecimal(BinaryRowWriter.java:25)
	at org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.reconvert(RowConverter.java:412)
	at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:191)
	at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:71)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
	... 6 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
	... 2 more

Exception after the fix :

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:309)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:203)
	... 17 more
Caused by: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java:358)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:289)
	... 18 more
Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-11], ErrorDescription:[deciaml type precision is incompatible. ] - `price` field value is: 101.1, except filed schema of sink is `price` DECIMAL(4, 1), but the filed in sink table which actual schema is `price` DECIMAL(4, 2) '单价'.Please check schema of sink table.
	at org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.checkCanWriteWithSchema(RowConverter.java:542)
	at org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.reconvert(RowConverter.java:392)
	at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:191)
	at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:71)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
	... 6 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
	... 2 more

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

RowConverter.reconvert(data, sourceType, sinkSchema);
} catch (Exception e) {
Assertions.assertEquals(
"ErrorCode:[PAIMON-11], ErrorDescription:[deciaml type precision is incompatible. ] - `f0` field value is: 123.4, except filed schema of sink is `f0` DECIMAL(4, 1), but the filed in sink table which actual schema is `f0` DECIMAL(4, 2).Please check schema of sink table.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we convert the precision of data dynamically before insert it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes ,but need to modify the table schema

@nielifeng nielifeng requested a review from Copilot June 18, 2025 06:12
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request fixes a bug in the Paimon connector to address a NullPointerException that occurs when the decimal type precision is incompatible. Key changes include:

  • Updating the RowConverter to validate and handle decimal field values properly.
  • Adding a dedicated test case in RowConverterTest to verify the new error message.
  • Introducing a new error code and updating error messages in PaimonConnectorErrorCode.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java Adds test case for decimal conversions and validates error message content
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java Modifies decimal conversion logic and error message generation to correctly detect incompatible precision
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java Introduces a new error code with an associated message for decimal type precision incompatibility

hawk9821 and others added 4 commits June 19, 2025 16:15
…che/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java

Co-authored-by: Copilot <[email protected]>
…che/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java

Co-authored-by: Copilot <[email protected]>
…che/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java

Co-authored-by: Copilot <[email protected]>
@corgy-w corgy-w merged commit 37762c9 into apache:dev Jun 24, 2025
5 checks passed
@hawk9821 hawk9821 deleted the paimon_write_decimal_npe branch June 24, 2025 11:06
chncaesar pushed a commit to chncaesar/seatunnel that referenced this pull request Jun 30, 2025
dybyte pushed a commit to dybyte/seatunnel that referenced this pull request Jul 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants