-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[cdc-connector][cdc-base] Add SNAPSHOT mode #2901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
7247b49 to
5dce2b4
Compare
5dce2b4 to
34c9800
Compare
34c9800 to
454a485
Compare
|
@leonardBang , @ruanhang1993 , @GOODBOY008 , @Jiabao-Sun , CC |
Jiabao-Sun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @loserwang1024 for this great work.
I left some comments. Please take a look when you have time.
By the way, "SNAPSHOT_ONLY" sounds a bit ambiguous, in fact, it is "INCREMENTAL_SNAPSHOT_ONLY". How about we simply call it "SNAPSHOT"?
| public static boolean isBinlogOnlyStartupMode(StartupMode startupMode) { | ||
| return startupMode == StartupMode.EARLIEST_OFFSET | ||
| || startupMode == StartupMode.LATEST_OFFSET | ||
| || startupMode == StartupMode.SPECIFIC_OFFSETS | ||
| || startupMode == StartupMode.TIMESTAMP; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public static boolean isBinlogOnlyStartupMode(StartupMode startupMode) { | |
| return startupMode == StartupMode.EARLIEST_OFFSET | |
| || startupMode == StartupMode.LATEST_OFFSET | |
| || startupMode == StartupMode.SPECIFIC_OFFSETS | |
| || startupMode == StartupMode.TIMESTAMP; | |
| } | |
| public boolean isStreamOnly() { | |
| return startupMode == StartupMode.EARLIEST_OFFSET | |
| || startupMode == StartupMode.LATEST_OFFSET | |
| || startupMode == StartupMode.SPECIFIC_OFFSETS | |
| || startupMode == StartupMode.TIMESTAMP; | |
| } | |
| public boolean isSnapshotOnly() { | |
| return startupMode == StartupMode.SNAPSHOT_ONLY; | |
| } | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good.
| if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY) { | ||
| return Boundedness.BOUNDED; | ||
| } else { | ||
| return Boundedness.CONTINUOUS_UNBOUNDED; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY) { | |
| return Boundedness.BOUNDED; | |
| } else { | |
| return Boundedness.CONTINUOUS_UNBOUNDED; | |
| } | |
| if (sourceConfig.getStartupOptions().isStreamOnly()) { | |
| return Boundedness.CONTINUOUS_UNBOUNDED; | |
| } else { | |
| return Boundedness.BOUNDED; | |
| } |
...flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java
Show resolved
Hide resolved
| // snapshot splits as the ending offset to provide a consistent snapshot view at the moment | ||
| // of high watermark. | ||
| Offset stoppingOffset = offsetFactory.createNoStoppingOffset(); | ||
| if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY) { | |
| if (sourceConfig.getStartupOptions().isSnapshotOnly()) { |
| if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY | ||
| && sourceSplit.isStreamSplit()) { | ||
| // when startupMode = SNAPSHOT_ONLY. the stream split could finish. | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (sourceConfig.getStartupOptions().startupMode == StartupMode.SNAPSHOT_ONLY | |
| && sourceSplit.isStreamSplit()) { | |
| // when startupMode = SNAPSHOT_ONLY. the stream split could finish. | |
| continue; | |
| } | |
| if (sourceConfig.getStartupOptions().isSnapshotOnly() && sourceSplit.isStreamSplit()) { | |
| // when startupMode = SNAPSHOT_ONLY. the stream split could finish. | |
| continue; | |
| } |
Shall we remove the streamSplit from uncompletedStreamSplits here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is no need. uncompletedStreamSplits puts streamSplit which not start and wait for lacked table metadata. When all metadata are received, will add back in requestStreamSplitMetaIfNeeded. So a finished split must not in uncompletedStreamSplits.
You mean like suspendedBinlogSplit in mysql? We don't support it until add new scan table ability in cdc framework.
| private final FetchTask.Context taskContext; | ||
| private final ExecutorService executorService; | ||
| private volatile ChangeEventQueue<DataChangeEvent> queue; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
| } finally { | ||
| try { | ||
| stopReadTask(); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the stream split is unbounded before this PR, we set this.currentTaskRunning = false; when error exception in catch , and stream split never will stop for other reason.
Now in INCREMENTAL_SNAPSHOT_ONLY mode, the job will stop after reaching to the highwatermark.
454a485 to
ce81f94
Compare
Thanks a lot, I will take it. |
72f4b29 to
5db114a
Compare
5db114a to
d6067b5
Compare
|
@loserwang1024 could you help check the CI failure? |
d6067b5 to
90f2589
Compare
90f2589 to
0579a61
Compare
leonardBang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @loserwang1024 for the great work and @Jiabao-Sun for the detailed review, LGTM. Ci has been green, merging...
…Source framework (#2901)
|
@loserwang1024 Hello, Why still have reading binlog(StreamingChangeEventSource) phase ? |
|
I discovered that |
Hi, @minchowang , See more details in discussion of #2867. |
…Source framework (apache#2901)
|
Hi @loserwang1024 @leonardBang @Jiabao-Sun , premise :
dingding : 19x_7ajbnqz93d |
hi, @LinMingQiang, still not support execution.runtime-mode=BATCH now, though snapshot mode will read in batch. Because we still don't support some statistic data for batch optimization, it's meaningless in batch mode.
Yes, the incremental snapshot framework rely on checkpoint now. |
…Source framework (apache#2901)
As shown in #2867, Add SNAPSHOT_ONLY mode to framework, and expose to mysql, postgresSQL, mongo.
- Oracle are not exposed until #2909 is fixed.
- Sqlserver are not exposed until #2853 is fixed.)