-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Fix] [Mongo-cdc] Fallback to timestamp startup mode when resume token has expired #8754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ing with Elasticsearch's automatic index creation apache#7430
…utomatic index creation conflict apache#7430
…utomatically creating indexes based on templates apache#7430
…utomatic index creation conflict apache#7430
…utomatic index creation conflict apache#7430
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jw-itq , Do we have some way to add test case verify it?
ok, I'll give it a try. It might take some time. |
|
This PR also fixes the issue where MongoDB CDC fails to resume from the savepoint during restart recovery. |
|
@Hisoka-X It is difficult to reproduce test cases where the resumetoken is invalid because the checkpoint file of seatunnel is automatically parsed and cannot be specified or modified. Can I reproduce testing such abnormal cases by modifying the checkpoint file |
sure. |
|
@Hisoka-X Fixed. Please help review. thanks. |
|
@Hisoka-X could you please help me review it,thanks! |
|
waiting test case passes. |
@Hisoka-X hi,sorry to bother you, may I ask if this needs to be resolved? can you help me take a look,thanks. |
|
@Hisoka-X test case has passed, please help review it, thanks! |
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jw-itq !
| try { | ||
| next = Optional.ofNullable(changeStreamCursor.tryNext()); | ||
| } catch (MongoCommandException e) { | ||
| if (MongodbUtils.checkIfChangeStreamCursorExpires(e)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add an option to let user decide to fallback to timestamp restart mode or direct throw exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add an option to let user decide to fallback to timestamp restart mode or direct throw exception?
ok, thank you!, but the failure of the resume token captures the 280 exception in mongo. At this time, the resume token is invalid, but the timestamp is generally valid, which means that the oplog log may still be there. When the timestamp cannot be found, it means that the oplog log is also missing. At this time, it means that breakpoint recovery is completely impossible, and there is a normal exception occurring,this is to ensure smooth recovery from breakpoints. I am wondering if it is necessary to add configuration options for users to choose from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor expire will lost data (or read duplicate) or not? If we can make sure this behavior would not lost any data then not provode any option is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor expire will lost data (or read duplicate) or not? If we can make sure this behavior would not lost any data then not provode any option is ok.
If the current time is used, there is a possibility of data loss. Initially, the code did not include the logic of the current timestamp, and I don't understand why this logic was added at that time.

the issue of writing a large amount of duplicate data during the restart of mongo cdc has been resolved in this submission.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor expire will lost data (or read duplicate) or not? If we can make sure this behavior would not lost any data then not provode any option is ok.
there is usually no data loss situation because the checkpoint saves the resume token and timestamp. If the timestamp is valid, there will be no problem, but if it is invalid, there will be an exception


Purpose of this pull request
Synchronize the Flink-CDC code to fix the Mongo-CDC issue.
When MongoDB CDC connector tries to create cursor with an expired resuming token during stream task fetching stage, it will crash with a fatal exception: error due to Command failed with error 280 (ChangeStreamFatalError): cannot resume stream; the resume token was not found.
This PR added fallback logic to create cursor with timestamp, which only runs when:
Mongo CDC is in StreamTaskFetch stage
a ChangeStreamFatalError (280) is raised
Current ChangeStreamOffset has a valid timestamp field
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
release-note.