Skip to content

Conversation

@jw-itq
Copy link
Contributor

@jw-itq jw-itq commented Feb 17, 2025

Purpose of this pull request

Synchronize the Flink-CDC code to fix the Mongo-CDC issue.

When MongoDB CDC connector tries to create cursor with an expired resuming token during stream task fetching stage, it will crash with a fatal exception: error due to Command failed with error 280 (ChangeStreamFatalError): cannot resume stream; the resume token was not found.

This PR added fallback logic to create cursor with timestamp, which only runs when:

Mongo CDC is in StreamTaskFetch stage
a ChangeStreamFatalError (280) is raised
Current ChangeStreamOffset has a valid timestamp field

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

jw-itq added 24 commits August 22, 2024 19:10
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jw-itq , Do we have some way to add test case verify it?

@jw-itq
Copy link
Contributor Author

jw-itq commented Feb 18, 2025

Thanks @jw-itq , Do we have some way to add test case verify it?

ok, I'll give it a try. It might take some time.

@jw-itq
Copy link
Contributor Author

jw-itq commented Feb 19, 2025

This PR also fixes the issue where MongoDB CDC fails to resume from the savepoint during restart recovery.

@github-actions github-actions bot added the e2e label Feb 22, 2025
@jw-itq
Copy link
Contributor Author

jw-itq commented Mar 3, 2025

@Hisoka-X It is difficult to reproduce test cases where the resumetoken is invalid because the checkpoint file of seatunnel is automatically parsed and cannot be specified or modified. Can I reproduce testing such abnormal cases by modifying the checkpoint file

@Hisoka-X
Copy link
Member

Hisoka-X commented Mar 3, 2025

@Hisoka-X It is difficult to reproduce test cases where the resumetoken is invalid because the checkpoint file of seatunnel is automatically parsed and cannot be specified or modified. Can I reproduce testing such abnormal cases by modifying the checkpoint file

sure.

@github-actions github-actions bot added the Zeta label Mar 5, 2025
@jw-itq
Copy link
Contributor Author

jw-itq commented Mar 6, 2025

@Hisoka-X Fixed. Please help review. thanks.

@jw-itq
Copy link
Contributor Author

jw-itq commented Mar 10, 2025

@Hisoka-X could you please help me review it,thanks!

@Hisoka-X
Copy link
Member

waiting test case passes.

@jw-itq
Copy link
Contributor Author

jw-itq commented Mar 10, 2025

waiting test case passes.

only here hasn't passed, but I don't understand why
image

@jw-itq
Copy link
Contributor Author

jw-itq commented Mar 11, 2025

waiting test case passes.

only here hasn't passed, but I don't understand why image

@Hisoka-X hi,sorry to bother you, may I ask if this needs to be resolved? can you help me take a look,thanks.

@jw-itq
Copy link
Contributor Author

jw-itq commented Mar 11, 2025

@Hisoka-X test case has passed, please help review it, thanks!

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jw-itq !

try {
next = Optional.ofNullable(changeStreamCursor.tryNext());
} catch (MongoCommandException e) {
if (MongodbUtils.checkIfChangeStreamCursorExpires(e)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an option to let user decide to fallback to timestamp restart mode or direct throw exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an option to let user decide to fallback to timestamp restart mode or direct throw exception?

ok, thank you!, but the failure of the resume token captures the 280 exception in mongo. At this time, the resume token is invalid, but the timestamp is generally valid, which means that the oplog log may still be there. When the timestamp cannot be found, it means that the oplog log is also missing. At this time, it means that breakpoint recovery is completely impossible, and there is a normal exception occurring,this is to ensure smooth recovery from breakpoints. I am wondering if it is necessary to add configuration options for users to choose from.

Copy link
Member

@Hisoka-X Hisoka-X Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor expire will lost data (or read duplicate) or not? If we can make sure this behavior would not lost any data then not provode any option is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor expire will lost data (or read duplicate) or not? If we can make sure this behavior would not lost any data then not provode any option is ok.

If the current time is used, there is a possibility of data loss. Initially, the code did not include the logic of the current timestamp, and I don't understand why this logic was added at that time.
image

the issue of writing a large amount of duplicate data during the restart of mongo cdc has been resolved in this submission.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor expire will lost data (or read duplicate) or not? If we can make sure this behavior would not lost any data then not provode any option is ok.

there is usually no data loss situation because the checkpoint saves the resume token and timestamp. If the timestamp is valid, there will be no problem, but if it is invalid, there will be an exception

@hailin0 hailin0 merged commit afc990d into apache:dev Mar 13, 2025
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants