Skip to content

Conversation

@loserwang1024
Copy link
Contributor

@loserwang1024 loserwang1024 commented Dec 20, 2023

As shown in #2867, Add SNAPSHOT_ONLY mode to framework, and expose to mysql, postgresSQL, mongo.
- Oracle are not exposed until #2909 is fixed.
- Sqlserver are not exposed until #2853 is fixed.)

@loserwang1024 loserwang1024 force-pushed the snapshot-only branch 5 times, most recently from 7247b49 to 5dce2b4 Compare December 22, 2023 09:27
loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Dec 22, 2023
loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Dec 22, 2023
loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Dec 25, 2023
loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Dec 25, 2023
loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Dec 25, 2023
loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Dec 25, 2023
@loserwang1024 loserwang1024 changed the title [WIP][cdc-connector][cdc-base] Add SNAPSHOT_ONLY mode [cdc-connector][cdc-base] Add SNAPSHOT_ONLY mode Dec 25, 2023
@loserwang1024
Copy link
Contributor Author

Copy link
Contributor

@Jiabao-Sun Jiabao-Sun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @loserwang1024 for this great work.
I left some comments. Please take a look when you have time.

By the way, "SNAPSHOT_ONLY" sounds a bit ambiguous, in fact, it is "INCREMENTAL_SNAPSHOT_ONLY". How about we simply call it "SNAPSHOT"?

Comment on lines 116 to 122
public static boolean isBinlogOnlyStartupMode(StartupMode startupMode) {
return startupMode == StartupMode.EARLIEST_OFFSET
|| startupMode == StartupMode.LATEST_OFFSET
|| startupMode == StartupMode.SPECIFIC_OFFSETS
|| startupMode == StartupMode.TIMESTAMP;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static boolean isBinlogOnlyStartupMode(StartupMode startupMode) {
return startupMode == StartupMode.EARLIEST_OFFSET
|| startupMode == StartupMode.LATEST_OFFSET
|| startupMode == StartupMode.SPECIFIC_OFFSETS
|| startupMode == StartupMode.TIMESTAMP;
}
public boolean isStreamOnly() {
return startupMode == StartupMode.EARLIEST_OFFSET
|| startupMode == StartupMode.LATEST_OFFSET
|| startupMode == StartupMode.SPECIFIC_OFFSETS
|| startupMode == StartupMode.TIMESTAMP;
}
public boolean isSnapshotOnly() {
return startupMode == StartupMode.SNAPSHOT_ONLY;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good.

Comment on lines 106 to 108
if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY) {
return Boundedness.BOUNDED;
} else {
return Boundedness.CONTINUOUS_UNBOUNDED;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY) {
return Boundedness.BOUNDED;
} else {
return Boundedness.CONTINUOUS_UNBOUNDED;
}
if (sourceConfig.getStartupOptions().isStreamOnly()) {
return Boundedness.CONTINUOUS_UNBOUNDED;
} else {
return Boundedness.BOUNDED;
}

// snapshot splits as the ending offset to provide a consistent snapshot view at the moment
// of high watermark.
Offset stoppingOffset = offsetFactory.createNoStoppingOffset();
if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY) {
if (sourceConfig.getStartupOptions().isSnapshotOnly()) {

Comment on lines 145 to 147
if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY
&& sourceSplit.isStreamSplit()) {
// when startupMode = SNAPSHOT_ONLY. the stream split could finish.
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY
&& sourceSplit.isStreamSplit()) {
// when startupMode = SNAPSHOT_ONLY. the stream split could finish.
continue;
}
if (sourceConfig.getStartupOptions().isSnapshotOnly() && sourceSplit.isStreamSplit()) {
// when startupMode = SNAPSHOT_ONLY. the stream split could finish.
continue;
}

Shall we remove the streamSplit from uncompletedStreamSplits here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is no need. uncompletedStreamSplits puts streamSplit which not start and wait for lacked table metadata. When all metadata are received, will add back in requestStreamSplitMetaIfNeeded. So a finished split must not in uncompletedStreamSplits.
You mean like suspendedBinlogSplit in mysql? We don't support it until add new scan table ability in cdc framework.

private final FetchTask.Context taskContext;
private final ExecutorService executorService;
private volatile ChangeEventQueue<DataChangeEvent> queue;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

nit

} finally {
try {
stopReadTask();
} catch (Exception e) {
throw new RuntimeException(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the stream split is unbounded before this PR, we set this.currentTaskRunning = false; when error exception in catch , and stream split never will stop for other reason.
Now in INCREMENTAL_SNAPSHOT_ONLY mode, the job will stop after reaching to the highwatermark.

loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Dec 27, 2023
loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Dec 27, 2023
@loserwang1024 loserwang1024 changed the title [cdc-connector][cdc-base] Add SNAPSHOT_ONLY mode [cdc-connector][cdc-base] Add SNAPSHOT mode Dec 27, 2023
@loserwang1024
Copy link
Contributor Author

By the way, "SNAPSHOT_ONLY" sounds a bit ambiguous, in fact, it is "INCREMENTAL_SNAPSHOT_ONLY". How about we simply call it "SNAPSHOT"?

Thanks a lot, I will take it.

loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Jan 9, 2024
loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Jan 9, 2024
loserwang1024 added a commit to loserwang1024/flink-cdc-connectors that referenced this pull request Jan 9, 2024
@leonardBang
Copy link
Contributor

leonardBang commented Jan 11, 2024

@loserwang1024 could you help check the CI failure?

@leonardBang leonardBang added this to the V3.1.0 milestone Jan 11, 2024
Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @loserwang1024 for the great work and @Jiabao-Sun for the detailed review, LGTM. Ci has been green, merging...

@leonardBang leonardBang merged commit e3f6829 into apache:master Jan 13, 2024
leonardBang pushed a commit that referenced this pull request Jan 13, 2024
@minchowang
Copy link
Contributor

@loserwang1024 Hello, Why still have reading binlog(StreamingChangeEventSource) phase ?

@minchowang
Copy link
Contributor

I discovered that skipSnapshotBackfill(true), the SNAPSHOT mode is the expected result now.

@loserwang1024
Copy link
Contributor Author

Why still have reading binlog(StreamingChangeEventSource) phase ?

Hi, @minchowang , See more details in discussion of #2867.

joyCurry30 pushed a commit to joyCurry30/flink-cdc-connectors that referenced this pull request Mar 22, 2024
joyCurry30 pushed a commit to joyCurry30/flink-cdc-connectors that referenced this pull request Mar 22, 2024
@LinMingQiang
Copy link

LinMingQiang commented Sep 14, 2024

Hi @loserwang1024 @leonardBang @Jiabao-Sun ,
I've encountered a few problems.

premise :

  • Only 1 record in my mysql.
  • Flink 1.8.1
  • flink cdc 3.1 & 3.2
CREATE TEMPORARY TABLE mysql_cdc (
     id STRING,
     name string,
     dt string,
     PRIMARY KEY(id) NOT ENFORCED
     ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = 'localhost',
     'port' = '3306',
     'username' = 'root',
     'password' = '123456789',
     'database-name' = 'tt',
     'table-name' = 'mysql_cdc_source',
     'scan.startup.mode'='snapshot');
  1. Not supporting BATCH execution mode ?, Set execution.runtime-mode=BATCH will met error.

  2. When i set execution.runtime-mode=STREAMING and checkpoint is disable , then the job won't be never finish.
    2.1 : if set execution.checkpointing.interval=60s, then job will be finished after checkpoint had triggered (60s).
    2.2: if set execution.checkpointing.interval=5s, then the job will be finished soon. (As expected)

dingding : 19x_7ajbnqz93d

@loserwang1024
Copy link
Contributor Author

loserwang1024 commented Sep 23, 2024

Not supporting BATCH execution mode

hi, @LinMingQiang, still not support execution.runtime-mode=BATCH now, though snapshot mode will read in batch. Because we still don't support some statistic data for batch optimization, it's meaningless in batch mode.

checkpoint is disable , then the job won't be never finish.

Yes, the incremental snapshot framework rely on checkpoint now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants