Skip to content

Conversation

@lzshlzsh
Copy link
Contributor

@lzshlzsh lzshlzsh commented Oct 9, 2023

this fix #2538

@lzshlzsh
Copy link
Contributor Author

lzshlzsh commented Oct 9, 2023

@loserwang1024 @leonardBang Would you help to review this pr?

Copy link
Contributor

@loserwang1024 loserwang1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lzshlzsh, thanks a lot for your contribution. I have provided some advice below:


private long maxCompletedCheckpointId;

public IncrementalSourceReader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that only a few type cdc source need commit offset, why not create another subclass named (such as IncrementalSourceReaderWithCommit, not so good but just for example).
Other cdc connectors such as mysql no need maintain states for checkpoint, and no need do some reductant operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loserwang1024 Thanks for review, IncrementalSourceReaderWithCommit is added, and is used for postgres-cdc currently.

if (split.isStreamSplit()) {
lastCheckPointStreamSplit.put(checkpointId, split.asStreamSplit());
LOG.debug(
"snapshot stream split, checkpoint id {}, stream split {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snapshot state of stream split : {}, and checkpoint id is {}.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


for (SourceSplitBase split : stateSplits) {
if (split.isStreamSplit()) {
lastCheckPointStreamSplit.put(checkpointId, split.asStreamSplit());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this design, it seems that in one checkpoint id, maybe more than one split need to snapshot its state. It seem that Map<Long, List<>> is better(just like Kafka source)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about this? In for loop, if not just one meet the requirement, later one will override the later one in heap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about this? In for loop, if not just one meet the requirement, later one will override the later one in heap

The code has been moved to IncrementalSourceReaderWithCommit. Currently, only StreamSplit is need, and should be just one StreamSplit for a checkpoint. Maybe a TreeMap<Long, StreamSplit> is enough, instead of TreeMap<Long, List<>>, do you think so?

Copy link
Contributor

@loserwang1024 loserwang1024 Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explain. It seems that TreeMap<Long, StreamSplit> are different versions of same streamSplit. Each streamSplit contains lots of redundant informations, such as tableSchemas and finishedSnapshotSplitInfos. It seems that only startingOffset is what we need. Why not change TreeMap<Long, StreamSplit> to TreeMap<Long, Offset>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks and gree with you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explain. It seems that TreeMap<Long, StreamSplit> are different versions of same streamSplit. Each streamSplit contains lots of redundant informations, such as tableSchemas and finishedSnapshotSplitInfos. It seems that only startingOffset is what we need. Why not change TreeMap<Long, StreamSplit> to TreeMap<Long, Offset>?

Have made the modifications according to your suggestions. Could you please do a review if you have time?

// it will begin consuming data from the PostgreSQL replication stream.
// Within PostgresStreamingChangeEventSource, the context's LSN will be updated.
commitLsn =
((PostgresOffset) split.asStreamSplit().getStartingOffset())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest problem of this PR: each split is read between [start, end). If commit starting offset and then failover. The message of starting offset will lose.

Copy link
Contributor Author

@lzshlzsh lzshlzsh Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps what you're talking about is the start and end of the primary key?
This PR is an attempt to solve the data lost problem in the stream stage, where the root cause of the problem is that the offset of the streamsplit in the checkpoint state lags behind the confirmed_flush_lsn of the postgres slot, the wal data between them has been lost if failover before the next checkpoint succeeds, such as the reproducing test case added by the first commit of this PR.

There is no problem in scan/snapshot stage.

}
}

@Override
Copy link
Contributor

@loserwang1024 loserwang1024 Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In onSplitFinished method , remove finished splits from lastCheckPointStreamSplit, nor memory leak will occur.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastCheckPointStreamSplit.headMap(checkpointId, true).clear()

Copy link
Contributor Author

@lzshlzsh lzshlzsh Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. Yes, StreamSplit of current checkpointId should be removed.

* @see CheckpointListener#notifyCheckpointComplete(long)
*/
@Override
default void notifyCheckpointComplete(long checkpointId) throws Exception {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seem that this method is no longer use

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have to provide a default implementation, as the supper.CheckpointListener does not have a default implementation.

(Long)
postgresOffsetContext
.getOffset()
.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It no longer need this code, because only streamingSlit is allowed to commitCurrentOffset. More over, LAST_COMMIT_LSN_KEY is update by Begin or Commit rather than message (See PostgresStreamingChangeEventSource#processMessages -> PostgresStreamingChangeEventSource#commitMessage -> PostgresOffsetContext#updateCommitPosition).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It no longer need this code, because only streamingSlit is allowed to commitCurrentOffset. More over, LAST_COMMIT_LSN_KEY is update by Begin or Commit rather than message (See PostgresStreamingChangeEventSource#processMessages -> PostgresStreamingChangeEventSource#commitMessage -> PostgresOffsetContext#updateCommitPosition).

Thanks for your review, I will modify the code according to your suggestion。
Yes, your are right, LAST_COMMIT_LSN_KEY is updated by Begin or commit message.

I want to explain the data lost problem in more detail, because the problem happens on our online when failover. After apply this pr's fix, there have been no data loss so far.

If checkpoint succeeds and before notifyCheckpoint's commit confirmed_flush_lsn finishes, there's a table UPDATE event arrives, the event appears as BEGIN/UPDATE/COMMIT LSN sequence, the notifyCheckpoint will commit COMMIT's LSN to slot. If failover happends at this time and will restore from this checkpoint, it will begin consume from the slot's confirmed_flush_lsn(that is COMMIT's LSN), nor the LSN of the streamsplit's lsn of checkpoint, because streamsplit's lsn < slot's confirmed_flush_lsn。From the high level view, the table's UPDATE event is lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for your explain, I totally agree with you. Moreover, I have also found that Postgres offset commit is no bound to checkpoint id before your PR. If do checkpoint five consecutive times, then the first one is done(then notifyCheckpoint) , the offset of the fifth checkpoint will be commit. If failover and restart from first checkpoint, Wal-Log between [first_checkpoint, five_checkpoint) will be recycled.

Copy link
Contributor

@loserwang1024 loserwang1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@leonardBang
Copy link
Contributor

@lzshlzsh would you like to rebase the PR to latest master so that we can merge it

@lzshlzsh
Copy link
Contributor Author

@lzshlzsh would you like to rebase the PR to latest master so that we can merge it

Sorry, I saw the message too late。I'll rebase it.

@lzshlzsh
Copy link
Contributor Author

lzshlzsh commented Jan 4, 2024

@lzshlzsh would you like to rebase the PR to latest master so that we can merge it

@leonardBang have rebased on master, would you help to have a review?

@loserwang1024
Copy link
Contributor

@leonardBang , CC

…d to slot between snapshotState and notifyCheckpointComplete
@leonardBang leonardBang force-pushed the postgres-cdc-data-lost branch from 4d9c0d6 to 91acacc Compare January 19, 2024 09:55
Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lzshlzsh for the contribution and @loserwang1024 for the review work, LGTM

@leonardBang leonardBang merged commit 9ce3656 into apache:master Jan 21, 2024
joyCurry30 pushed a commit to joyCurry30/flink-cdc-connectors that referenced this pull request Mar 22, 2024
…d to slot between snapshotState and notifyCheckpointComplete (apache#2539)

This closes apache#2538.
Co-authored-by: sammieliu <[email protected]>
ChaomingZhangCN pushed a commit to ChaomingZhangCN/flink-cdc that referenced this pull request Jan 13, 2025
…d to slot between snapshotState and notifyCheckpointComplete (apache#2539)

This closes apache#2538.
Co-authored-by: sammieliu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug][postgres-cdc] Data lost when new lsn committed to slot between snapshotState and notifyCheckpointComplete

3 participants