Skip to content

[Bug] [Connector-V2] Bug postgres sink does not support composite primary key in jdbc mode #8350

@ZhiYinZhang

Description

@ZhiYinZhang

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Sync data from MySQL to postgres in batch mode can not set composite primary key.

SeaTunnel Version

2.3.8

SeaTunnel Config

env{
  parallelism=1
  job.mode="BATCH"
}
source {
  jdbc {
    url = "jdbc:mysql://192.168.17.201:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user="root"
    password="xxxxxxxx"
    table_list=[{table_path="test.yh"}]
  }
}

sink{
  jdbc{
    url = "jdbc:postgresql://192.168.17.201:5432/postgres"
    driver = "org.postgresql.Driver"
    user = "postgres"
    password = "xxxxxxxx"
    database = "postgres"
    table = "myschema.${table_name}"
    generate_sink_sql = true
    primary_keys=["yhbh","zhxh"]
  }
}

Running Command

./bin/seatunnel.sh -c config/batch-mysql2pg.conf

Error Exception

Caused by: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:378)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:384)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
        at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242)
        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:511)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.seatunnel.api.table.catalog.exception.CatalogException: ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed creating table postgres.myschema.yh
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog.createTableInternal(PostgresCatalog.java:185)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:425)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.savemode.JdbcSaveModeHandler.createTable(JdbcSaveModeHandler.java:48)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:115)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:74)
        at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:40)
        at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:376)
        ... 21 more
Caused by: org.postgresql.util.PSQLException: ERROR: multiple primary keys for table "yh" are not allowed
  Position: 648
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2733)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2420)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:372)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:517)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:434)
        at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
        at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:180)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:620)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog.createTableInternal(PostgresCatalog.java:167)
        ... 28 more

        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:518)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

Zeta or Flink or Spark Version

zeta

Java or Scala Version

java 1.8.0

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions