-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
I have over 200 tables being synchronized via MySQL-CDC to both StarRocks and MySQL (via JDBC). I created savepoints during both the incremental and snapshot phases, and the savepoint status shows 'SAVEPOINT_DONE'. However, when trying to recover using 'seatunnel.sh -r jobid', I encounter errors:
For StarRocks synchronization, the error is:
'The source action name Source[0]-MySQL-CDC is not found in the checkpoint keys [ActionStateKey - pipeline-1 [Sink[0]-StarRocks-**'
For JDBC synchronization tasks, the error is generally:
'o.debezium.DebeziumException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. Replicate the missing transactions from elsewhere, or provision a new slave from backup. Consider increasing the master's binary log expiration period. The GTID sets and the missing purged transactions are too long to print in this message. For more information, please see the master's error log or the manual for GTID_SUBTRACT. Error code: 1236;'
I have confirmed that the source binlog has not been deleted. Please help me investigate this issue. Note that when I use MySQL-CDC for single-table synchronization, it works normally.
SeaTunnel Version
dev
SeaTunnel Config
{
"env" : {
"parallelism" : 1,
"checkpoint.interval" : 20000,
"checkpoint.timeout" : 600000,
"job.mode" : "STREAMING"
},
"source" : [
{
"plugin_output" : "test",
"username" : "******",
"password" : "******",
"base-url" : "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true",
"include.schema.changes" : true,
"table-names" : [
test.user1,
test.user2
],
"plugin_name" : "MySQL-CDC"
}
],
"transform" : [],
"sink" : [
{
"plugin_input" : "test",
"nodeUrls" : [
"127.0.0.1:8030"
],
"username" : "******",
"password" : "******",
"database" : "test",
"table" : "${table_name}",
"base-url" : "jdbc:mysql://127.0.0.1:9030/test",
"max_retries" : 3,
"enable_upsert_delete" : true,
"schema_save_mode" : "RECREATE_SCHEMA",
"data_save_mode" : "DROP_DATA",
"save_mode_create_template" : "\n CREATE TABLE IF NOT EXISTS test.`${table_name}` (\n ${rowtype_primary_key},\n ${rowtype_fields}\n ) ENGINE=OLAP\n PRIMARY KEY (${rowtype_primary_key})\n DISTRIBUTED BY HASH (${rowtype_primary_key})\n PROPERTIES (\n \"replication_num\" = \"1\",\n \"in_memory\" = \"false\",\n \"enable_persistent_index\" = \"true\",\n \"replicated_storage\" = \"true\",\n \"compression\" = \"LZ4\"\n )\n ",
"plugin_name" : "StarRocks"
}
]
}
Running Command
seatunnel.sh -c mysql-to-starrocks.conf -r jobid --asyncError Exception
mysql-cdc to starrocks error:
[] 2025-01-05 21:41:34,405 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error,
[] 2025-01-05 21:41:34,405 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues
[] 2025-01-05 21:41:34,405 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed
[] 2025-01-05 21:41:34,412 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:227)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.example.engine.SeaTunnelEngineLocalExample.main(SeaTunnelEngineLocalExample.java:50)
Caused by: java.lang.IllegalArgumentException: The source action name Source[0]-MySQL-CDC is not found in the checkpoint keys [ActionStateKey - pipeline-1 [Sink[0]-StarRocks-test.user1], ActionStateKey - pipeline-1 [Sink[0]-StarRocks-test.user2],*****].
at org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.getSourceCheckpoint(MultipleTableJobConfigParser.java:798)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:399)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:236)
at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:123)
at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:191)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:164)
... 2 more
[] 2025-01-05 21:41:34,413 ERROR org.apache.seatunnel.core.starter.SeaTunnel -
===============================================================================
mysql-cdc to jdbc error:
2025-01-05 11:02:06,706 INFO [.c.i.s.ClientStatisticsService] [main] - Client statistics is enabled with period 5 seconds.
{"jobId":927600520204386307,"jobName":"mysql-cdc-to-jdbc","jobStatus":"FAILED","submitTime":1736044736929,"finishTime":1736045095061,"pipelineStateMapperMap":{"PipelineLocation(jobId=927600520204386307, pipelineId=1)":{"pipelineStatus":"FAILED","executionStateMap":{"TaskGroupLocation{jobId=927600520204386307, pipelineId=1, taskGroupId=1}":"CANCELED","TaskGroupLocation{jobId=927600520204386307, pipelineId=1, taskGroupId=30000}":"FAILED"}}},"errorMessage":"java.lang.RuntimeException: One or more fetchers have encountered exception\n\tat org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)\n\tat org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)\n\tat org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)\n\tat org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)\n\tat org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)\n\tat org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:169)\n\tat org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)\n\tat org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)\n\tat org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019)\n\tat org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records\n\tat org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)\n\tat org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)\n\t... 5 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)\n\tat io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1239)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)\n\t... 1 more\nCaused by: io.debezium.DebeziumException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. Replicate the missing transactions from elsewhere, or provision a new slave from backup. Consider increasing the master's binary log expiration period. The GTID sets and the missing purged transactions are too long to print in this message. For more information, please see the master's error log or the manual for GTID_SUBTRACT. Error code: 1236; SQLSTATE: HY000.\n\tat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1194)\n\t... 5 more\nCaused by: com.github.shyiko.mysql.binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. Replicate the missing transactions from elsewhere, or provision a new slave from backup. Consider increasing the master's binary log expiration period. The GTID sets and the missing purged transactions are too long to print in this message. For more information, please see the master's error log or the manual for GTID_SUBTRACT.\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1043)\n\t... 3 more\n"}
Zeta or Flink or Spark Version
zeta
Java or Scala Version
jdk8 and jdk11
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct