Skip to content

Conversation

@jw-itq
Copy link
Contributor

@jw-itq jw-itq commented Jun 18, 2025

Purpose of this pull request

image
When I was synchronizing a 500 million table of MongoDB, the snapshot phase had not yet been completed, and the cluster restarted. The task needed to be restored, which caused a memory room and CPU utilization to climb. After testing, it was confirmed that the default value of the isExactlyOnce method was true, which caused the issue.

@hailin0. thank you for your guidance!

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

jw-itq added 25 commits August 22, 2024 19:10
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes the issue where the MongoDB connector’s isExactlyOnce default was true (leading to resource spikes on restart) and adds robust handling for expired change-stream cursors and resume tokens.

  • Changed the default isExactlyOnce behavior to false
  • Introduced MongodbUtils helpers to detect cursor and resume-token expiration
  • Enhanced stream fetch task to recreate cursors on failures and respect bounded reads

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
MongodbUtils.java Added checkIfChangeStreamCursorExpires and checkIfResumeTokenExpires with new constants and imports
MongodbStreamFetchTask.java Wrapped tryNext() in try/catch, fallback logic to reopen cursors, and updated bounded-read enqueuing
MongodbFetchTaskContext.java Changed isExactlyOnce() default from true to false
MongodbSourceConfig.java Changed isExactlyOnce() default from true to false
MongodbSourceOptions.java Added new error-code constants and INVALID_CHANGE_STREAM_ERRORS set
Comments suppressed due to low confidence (2)

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java:420

  • No unit tests currently cover the new checkIfChangeStreamCursorExpires and checkIfResumeTokenExpires methods; please add tests to verify both success and failure cases.
    public static boolean checkIfChangeStreamCursorExpires(final MongoCommandException e) {

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java:142

  • Since the default isExactlyOnce behavior changed to false, please update any user-facing documentation or config reference to reflect this new default.
        return false;

if (e.getCode() != CHANGE_STREAM_FATAL_ERROR) {
return false;
}
String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT);
Copy link

Copilot AI Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guard against a null e.getErrorMessage() before calling toLowerCase(...) to avoid potential NPEs.

Suggested change
String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT);
String errorMessage = e.getErrorMessage();
if (errorMessage == null) {
return false;
}
errorMessage = errorMessage.toLowerCase(Locale.ROOT);

Copilot uses AI. Check for mistakes.
"Resume token has expired, fallback to timestamp restart mode");
}
changeStreamCursor = openChangeStreamCursor(descriptor, resumeTokenExpires);
next = Optional.ofNullable(changeStreamCursor.tryNext());
Copy link

Copilot AI Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second tryNext() call after cursor recreation isn’t wrapped in a try/catch block, so any exception it throws will go unhandled; consider enclosing it similarly to the first call.

Suggested change
next = Optional.ofNullable(changeStreamCursor.tryNext());
try {
next = Optional.ofNullable(changeStreamCursor.tryNext());
} catch (MongoCommandException ex) {
log.error("Exception during second tryNext() call after cursor recreation", ex);
throw ex;
}

Copilot uses AI. Check for mistakes.
Comment on lines 89 to 94
new HashSet<>(
asList(
INVALIDATED_RESUME_TOKEN_ERROR,
CHANGE_STREAM_FATAL_ERROR,
CHANGE_STREAM_HISTORY_LOST,
BSON_OBJECT_TOO_LARGE));
Copy link

Copilot AI Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider using Set.of(...) to create an immutable set literal, which is more concise and signals immutability.

Suggested change
new HashSet<>(
asList(
INVALIDATED_RESUME_TOKEN_ERROR,
CHANGE_STREAM_FATAL_ERROR,
CHANGE_STREAM_HISTORY_LOST,
BSON_OBJECT_TOO_LARGE));
Set.of(
INVALIDATED_RESUME_TOKEN_ERROR,
CHANGE_STREAM_FATAL_ERROR,
CHANGE_STREAM_HISTORY_LOST,
BSON_OBJECT_TOO_LARGE);

Copilot uses AI. Check for mistakes.
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add new option named exactly_once to control the behavior? Just like other cdc connector.

@Hisoka-X Hisoka-X changed the title Fix the issue where mongo isExactlyOnce defaults to true, causing room to malfunction [Fix][Mongo-CDC] Fix the issue where mongo isExactlyOnce defaults to true, causing room to malfunction Jun 19, 2025
@jw-itq
Copy link
Contributor Author

jw-itq commented Jun 19, 2025

Could you add new option named exactly_once to control the behavior? Just like other cdc connector.

ok! thanks!

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jw-itq for update! Please update the docs.

@jw-itq
Copy link
Contributor Author

jw-itq commented Jun 25, 2025

Thanks @jw-itq for update! Please update the docs.

ok!

| poll.await.time.ms | Long | No | 1000 | The amount of time to wait before checking for new results on the change stream. |
| heartbeat.interval.ms | String | No | 0 | The length of time in milliseconds between sending heartbeat messages. Use 0 to disable. |
| incremental.snapshot.chunk.size.mb | Long | No | 64 | The chunk size mb of incremental snapshot. |
| exactly_once | Boolean| No | false | Enable exactly once semantic. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tell users the disadvantages of enabling exactly-once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tell users the disadvantages of enabling exactly-once?

ok!I thought so, too.

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if ci passes.

@corgy-w corgy-w merged commit 814b195 into apache:dev Jun 26, 2025
4 checks passed
dybyte pushed a commit to dybyte/seatunnel that referenced this pull request Jul 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants