-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Closed
Labels
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
Sync data from MySQL to postgres in batch mode can not set composite primary key.
SeaTunnel Version
2.3.8
SeaTunnel Config
env{
parallelism=1
job.mode="BATCH"
}
source {
jdbc {
url = "jdbc:mysql://192.168.17.201:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user="root"
password="xxxxxxxx"
table_list=[{table_path="test.yh"}]
}
}
sink{
jdbc{
url = "jdbc:postgresql://192.168.17.201:5432/postgres"
driver = "org.postgresql.Driver"
user = "postgres"
password = "xxxxxxxx"
database = "postgres"
table = "myschema.${table_name}"
generate_sink_sql = true
primary_keys=["yhbh","zhxh"]
}
}
Running Command
./bin/seatunnel.sh -c config/batch-mysql2pg.confError Exception
Caused by: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:378)
at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:384)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242)
at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:511)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.seatunnel.api.table.catalog.exception.CatalogException: ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed creating table postgres.myschema.yh
at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog.createTableInternal(PostgresCatalog.java:185)
at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:425)
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.savemode.JdbcSaveModeHandler.createTable(JdbcSaveModeHandler.java:48)
at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:115)
at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:74)
at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:40)
at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:376)
... 21 more
Caused by: org.postgresql.util.PSQLException: ERROR: multiple primary keys for table "yh" are not allowed
Position: 648
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2733)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2420)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:372)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:517)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:434)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:180)
at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:620)
at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog.createTableInternal(PostgresCatalog.java:167)
... 28 more
at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:518)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Zeta or Flink or Spark Version
zeta
Java or Scala Version
java 1.8.0
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct