-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[postgres] Fix data lost problem when new lsn committed to slot between snapshotState and notifyCheckpointComplete #2539
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@loserwang1024 @leonardBang Would you help to review this pr? |
loserwang1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lzshlzsh, thanks a lot for your contribution. I have provided some advice below:
|
|
||
| private long maxCompletedCheckpointId; | ||
|
|
||
| public IncrementalSourceReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that only a few type cdc source need commit offset, why not create another subclass named (such as IncrementalSourceReaderWithCommit, not so good but just for example).
Other cdc connectors such as mysql no need maintain states for checkpoint, and no need do some reductant operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@loserwang1024 Thanks for review, IncrementalSourceReaderWithCommit is added, and is used for postgres-cdc currently.
| if (split.isStreamSplit()) { | ||
| lastCheckPointStreamSplit.put(checkpointId, split.asStreamSplit()); | ||
| LOG.debug( | ||
| "snapshot stream split, checkpoint id {}, stream split {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Snapshot state of stream split : {}, and checkpoint id is {}.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
| for (SourceSplitBase split : stateSplits) { | ||
| if (split.isStreamSplit()) { | ||
| lastCheckPointStreamSplit.put(checkpointId, split.asStreamSplit()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this design, it seems that in one checkpoint id, maybe more than one split need to snapshot its state. It seem that Map<Long, List<>> is better(just like Kafka source)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about this? In for loop, if not just one meet the requirement, later one will override the later one in heap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about this? In for loop, if not just one meet the requirement, later one will override the later one in heap
The code has been moved to IncrementalSourceReaderWithCommit. Currently, only StreamSplit is need, and should be just one StreamSplit for a checkpoint. Maybe a TreeMap<Long, StreamSplit> is enough, instead of TreeMap<Long, List<>>, do you think so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your explain. It seems that TreeMap<Long, StreamSplit> are different versions of same streamSplit. Each streamSplit contains lots of redundant informations, such as tableSchemas and finishedSnapshotSplitInfos. It seems that only startingOffset is what we need. Why not change TreeMap<Long, StreamSplit> to TreeMap<Long, Offset>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks and gree with you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your explain. It seems that TreeMap<Long, StreamSplit> are different versions of same streamSplit. Each streamSplit contains lots of redundant informations, such as tableSchemas and finishedSnapshotSplitInfos. It seems that only startingOffset is what we need. Why not change TreeMap<Long, StreamSplit> to TreeMap<Long, Offset>?
Have made the modifications according to your suggestions. Could you please do a review if you have time?
| // it will begin consuming data from the PostgreSQL replication stream. | ||
| // Within PostgresStreamingChangeEventSource, the context's LSN will be updated. | ||
| commitLsn = | ||
| ((PostgresOffset) split.asStreamSplit().getStartingOffset()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The biggest problem of this PR: each split is read between [start, end). If commit starting offset and then failover. The message of starting offset will lose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps what you're talking about is the start and end of the primary key?
This PR is an attempt to solve the data lost problem in the stream stage, where the root cause of the problem is that the offset of the streamsplit in the checkpoint state lags behind the confirmed_flush_lsn of the postgres slot, the wal data between them has been lost if failover before the next checkpoint succeeds, such as the reproducing test case added by the first commit of this PR.
There is no problem in scan/snapshot stage.
| } | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In onSplitFinished method , remove finished splits from lastCheckPointStreamSplit, nor memory leak will occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastCheckPointStreamSplit.headMap(checkpointId, true).clear()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review. Yes, StreamSplit of current checkpointId should be removed.
| * @see CheckpointListener#notifyCheckpointComplete(long) | ||
| */ | ||
| @Override | ||
| default void notifyCheckpointComplete(long checkpointId) throws Exception {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seem that this method is no longer use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have to provide a default implementation, as the supper.CheckpointListener does not have a default implementation.
| (Long) | ||
| postgresOffsetContext | ||
| .getOffset() | ||
| .get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It no longer need this code, because only streamingSlit is allowed to commitCurrentOffset. More over, LAST_COMMIT_LSN_KEY is update by Begin or Commit rather than message (See PostgresStreamingChangeEventSource#processMessages -> PostgresStreamingChangeEventSource#commitMessage -> PostgresOffsetContext#updateCommitPosition).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It no longer need this code, because only streamingSlit is allowed to commitCurrentOffset. More over,
LAST_COMMIT_LSN_KEYis update byBeginorCommitrather than message (See PostgresStreamingChangeEventSource#processMessages -> PostgresStreamingChangeEventSource#commitMessage -> PostgresOffsetContext#updateCommitPosition).
Thanks for your review, I will modify the code according to your suggestion。
Yes, your are right, LAST_COMMIT_LSN_KEY is updated by Begin or commit message.
I want to explain the data lost problem in more detail, because the problem happens on our online when failover. After apply this pr's fix, there have been no data loss so far.
If checkpoint succeeds and before notifyCheckpoint's commit confirmed_flush_lsn finishes, there's a table UPDATE event arrives, the event appears as BEGIN/UPDATE/COMMIT LSN sequence, the notifyCheckpoint will commit COMMIT's LSN to slot. If failover happends at this time and will restore from this checkpoint, it will begin consume from the slot's confirmed_flush_lsn(that is COMMIT's LSN), nor the LSN of the streamsplit's lsn of checkpoint, because streamsplit's lsn < slot's confirmed_flush_lsn。From the high level view, the table's UPDATE event is lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for your explain, I totally agree with you. Moreover, I have also found that Postgres offset commit is no bound to checkpoint id before your PR. If do checkpoint five consecutive times, then the first one is done(then notifyCheckpoint) , the offset of the fifth checkpoint will be commit. If failover and restart from first checkpoint, Wal-Log between [first_checkpoint, five_checkpoint) will be recycled.
loserwang1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@lzshlzsh would you like to rebase the PR to latest master so that we can merge it |
Sorry, I saw the message too late。I'll rebase it. |
a87ebda to
4d9c0d6
Compare
@leonardBang have rebased on master, would you help to have a review? |
|
@leonardBang , CC |
…d to slot between snapshotState and notifyCheckpointComplete
4d9c0d6 to
91acacc
Compare
leonardBang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lzshlzsh for the contribution and @loserwang1024 for the review work, LGTM
…d to slot between snapshotState and notifyCheckpointComplete (apache#2539) This closes apache#2538. Co-authored-by: sammieliu <[email protected]>
…d to slot between snapshotState and notifyCheckpointComplete (apache#2539) This closes apache#2538. Co-authored-by: sammieliu <[email protected]>
this fix #2538