-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Fix][Mongo-CDC] Fix the issue where mongo isExactlyOnce defaults to true, causing room to malfunction #9454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ing with Elasticsearch's automatic index creation apache#7430
…utomatic index creation conflict apache#7430
…utomatically creating indexes based on templates apache#7430
…utomatic index creation conflict apache#7430
…utomatic index creation conflict apache#7430
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes the issue where the MongoDB connector’s isExactlyOnce default was true (leading to resource spikes on restart) and adds robust handling for expired change-stream cursors and resume tokens.
- Changed the default
isExactlyOncebehavior tofalse - Introduced
MongodbUtilshelpers to detect cursor and resume-token expiration - Enhanced stream fetch task to recreate cursors on failures and respect bounded reads
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| MongodbUtils.java | Added checkIfChangeStreamCursorExpires and checkIfResumeTokenExpires with new constants and imports |
| MongodbStreamFetchTask.java | Wrapped tryNext() in try/catch, fallback logic to reopen cursors, and updated bounded-read enqueuing |
| MongodbFetchTaskContext.java | Changed isExactlyOnce() default from true to false |
| MongodbSourceConfig.java | Changed isExactlyOnce() default from true to false |
| MongodbSourceOptions.java | Added new error-code constants and INVALID_CHANGE_STREAM_ERRORS set |
Comments suppressed due to low confidence (2)
seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java:420
- No unit tests currently cover the new
checkIfChangeStreamCursorExpiresandcheckIfResumeTokenExpiresmethods; please add tests to verify both success and failure cases.
public static boolean checkIfChangeStreamCursorExpires(final MongoCommandException e) {
seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java:142
- Since the default
isExactlyOncebehavior changed tofalse, please update any user-facing documentation or config reference to reflect this new default.
return false;
| if (e.getCode() != CHANGE_STREAM_FATAL_ERROR) { | ||
| return false; | ||
| } | ||
| String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT); |
Copilot
AI
Jun 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against a null e.getErrorMessage() before calling toLowerCase(...) to avoid potential NPEs.
| String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT); | |
| String errorMessage = e.getErrorMessage(); | |
| if (errorMessage == null) { | |
| return false; | |
| } | |
| errorMessage = errorMessage.toLowerCase(Locale.ROOT); |
| "Resume token has expired, fallback to timestamp restart mode"); | ||
| } | ||
| changeStreamCursor = openChangeStreamCursor(descriptor, resumeTokenExpires); | ||
| next = Optional.ofNullable(changeStreamCursor.tryNext()); |
Copilot
AI
Jun 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second tryNext() call after cursor recreation isn’t wrapped in a try/catch block, so any exception it throws will go unhandled; consider enclosing it similarly to the first call.
| next = Optional.ofNullable(changeStreamCursor.tryNext()); | |
| try { | |
| next = Optional.ofNullable(changeStreamCursor.tryNext()); | |
| } catch (MongoCommandException ex) { | |
| log.error("Exception during second tryNext() call after cursor recreation", ex); | |
| throw ex; | |
| } |
| new HashSet<>( | ||
| asList( | ||
| INVALIDATED_RESUME_TOKEN_ERROR, | ||
| CHANGE_STREAM_FATAL_ERROR, | ||
| CHANGE_STREAM_HISTORY_LOST, | ||
| BSON_OBJECT_TOO_LARGE)); |
Copilot
AI
Jun 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider using Set.of(...) to create an immutable set literal, which is more concise and signals immutability.
| new HashSet<>( | |
| asList( | |
| INVALIDATED_RESUME_TOKEN_ERROR, | |
| CHANGE_STREAM_FATAL_ERROR, | |
| CHANGE_STREAM_HISTORY_LOST, | |
| BSON_OBJECT_TOO_LARGE)); | |
| Set.of( | |
| INVALIDATED_RESUME_TOKEN_ERROR, | |
| CHANGE_STREAM_FATAL_ERROR, | |
| CHANGE_STREAM_HISTORY_LOST, | |
| BSON_OBJECT_TOO_LARGE); |
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add new option named exactly_once to control the behavior? Just like other cdc connector.
ok! thanks! |
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jw-itq for update! Please update the docs.
ok! |
| | poll.await.time.ms | Long | No | 1000 | The amount of time to wait before checking for new results on the change stream. | | ||
| | heartbeat.interval.ms | String | No | 0 | The length of time in milliseconds between sending heartbeat messages. Use 0 to disable. | | ||
| | incremental.snapshot.chunk.size.mb | Long | No | 64 | The chunk size mb of incremental snapshot. | | ||
| | exactly_once | Boolean| No | false | Enable exactly once semantic. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you tell users the disadvantages of enabling exactly-once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you tell users the disadvantages of enabling exactly-once?
ok!I thought so, too.
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if ci passes.
…true, causing room to malfunction (apache#9454)
Purpose of this pull request
When I was synchronizing a 500 million table of MongoDB, the snapshot phase had not yet been completed, and the cluster restarted. The task needed to be restored, which caused a memory room and CPU utilization to climb. After testing, it was confirmed that the default value of the isExactlyOnce method was true, which caused the issue.
@hailin0. thank you for your guidance!
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide