Skip to content

[Bug] [mysql-cdc-restore] mysql-cdc multi-table synchronization cannot restore normally #8455

@jw-itq

Description

@jw-itq

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

I have over 200 tables being synchronized via MySQL-CDC to both StarRocks and MySQL (via JDBC). I created savepoints during both the incremental and snapshot phases, and the savepoint status shows 'SAVEPOINT_DONE'. However, when trying to recover using 'seatunnel.sh -r jobid', I encounter errors:
For StarRocks synchronization, the error is:
'The source action name Source[0]-MySQL-CDC is not found in the checkpoint keys [ActionStateKey - pipeline-1 [Sink[0]-StarRocks-**'
For JDBC synchronization tasks, the error is generally:
'o.debezium.DebeziumException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. Replicate the missing transactions from elsewhere, or provision a new slave from backup. Consider increasing the master's binary log expiration period. The GTID sets and the missing purged transactions are too long to print in this message. For more information, please see the master's error log or the manual for GTID_SUBTRACT. Error code: 1236;'
I have confirmed that the source binlog has not been deleted. Please help me investigate this issue. Note that when I use MySQL-CDC for single-table synchronization, it works normally.

SeaTunnel Version

dev

SeaTunnel Config

{
    "env" : {
        "parallelism" : 1,
        "checkpoint.interval" : 20000,
        "checkpoint.timeout" : 600000,
        "job.mode" : "STREAMING"
    },
    "source" : [
        {
            "plugin_output" : "test",
            "username" : "******",
            "password" : "******",
            "base-url" : "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true",
            "include.schema.changes" : true,
            "table-names" : [
                test.user1,
                test.user2
            ],
            "plugin_name" : "MySQL-CDC"
        }
    ],
    "transform" : [],
    "sink" : [
        {
            "plugin_input" : "test",
            "nodeUrls" : [
                "127.0.0.1:8030"
            ],
            "username" : "******",
            "password" : "******",
            "database" : "test",
            "table" : "${table_name}",
            "base-url" : "jdbc:mysql://127.0.0.1:9030/test",
            "max_retries" : 3,
            "enable_upsert_delete" : true,
            "schema_save_mode" : "RECREATE_SCHEMA",
            "data_save_mode" : "DROP_DATA",
            "save_mode_create_template" : "\n    CREATE TABLE IF NOT EXISTS test.`${table_name}` (\n        ${rowtype_primary_key},\n        ${rowtype_fields}\n        ) ENGINE=OLAP\n        PRIMARY KEY (${rowtype_primary_key})\n        DISTRIBUTED BY HASH (${rowtype_primary_key})\n        PROPERTIES (\n                \"replication_num\" = \"1\",\n                \"in_memory\" = \"false\",\n                \"enable_persistent_index\" = \"true\",\n                \"replicated_storage\" = \"true\",\n                \"compression\" = \"LZ4\"\n          )\n    ",
            "plugin_name" : "StarRocks"
        }
    ]
}

Running Command

seatunnel.sh -c mysql-to-starrocks.conf -r jobid --async

Error Exception

mysql-cdc to starrocks error:
[] 2025-01-05 21:41:34,405 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error, 

[] 2025-01-05 21:41:34,405 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues

[] 2025-01-05 21:41:34,405 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed 

[] 2025-01-05 21:41:34,412 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:227)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.example.engine.SeaTunnelEngineLocalExample.main(SeaTunnelEngineLocalExample.java:50)
Caused by: java.lang.IllegalArgumentException: The source action name Source[0]-MySQL-CDC is not found in the checkpoint keys [ActionStateKey - pipeline-1 [Sink[0]-StarRocks-test.user1], ActionStateKey - pipeline-1 [Sink[0]-StarRocks-test.user2],*****].
	at org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.getSourceCheckpoint(MultipleTableJobConfigParser.java:798)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:399)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:236)
	at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:123)
	at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:191)
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:164)
	... 2 more
 
[] 2025-01-05 21:41:34,413 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
===============================================================================



mysql-cdc to jdbc error:
2025-01-05 11:02:06,706 INFO  [.c.i.s.ClientStatisticsService] [main] - Client statistics is enabled with period 5 seconds.
{"jobId":927600520204386307,"jobName":"mysql-cdc-to-jdbc","jobStatus":"FAILED","submitTime":1736044736929,"finishTime":1736045095061,"pipelineStateMapperMap":{"PipelineLocation(jobId=927600520204386307, pipelineId=1)":{"pipelineStatus":"FAILED","executionStateMap":{"TaskGroupLocation{jobId=927600520204386307, pipelineId=1, taskGroupId=1}":"CANCELED","TaskGroupLocation{jobId=927600520204386307, pipelineId=1, taskGroupId=30000}":"FAILED"}}},"errorMessage":"java.lang.RuntimeException: One or more fetchers have encountered exception\n\tat org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)\n\tat org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)\n\tat org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)\n\tat org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)\n\tat org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)\n\tat org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:169)\n\tat org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)\n\tat org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)\n\tat org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019)\n\tat org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records\n\tat org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)\n\tat org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)\n\t... 5 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)\n\tat io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1239)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)\n\t... 1 more\nCaused by: io.debezium.DebeziumException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. Replicate the missing transactions from elsewhere, or provision a new slave from backup. Consider increasing the master's binary log expiration period. The GTID sets and the missing purged transactions are too long to print in this message. For more information, please see the master's error log or the manual for GTID_SUBTRACT. Error code: 1236; SQLSTATE: HY000.\n\tat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1194)\n\t... 5 more\nCaused by: com.github.shyiko.mysql.binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. Replicate the missing transactions from elsewhere, or provision a new slave from backup. Consider increasing the master's binary log expiration period. The GTID sets and the missing purged transactions are too long to print in this message. For more information, please see the master's error log or the manual for GTID_SUBTRACT.\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1043)\n\t... 3 more\n"}

Zeta or Flink or Spark Version

zeta

Java or Scala Version

jdk8 and jdk11

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions