当zk出现session time out可能会导致instance出现多个消费者,从而出现乱序,数据丢失#5270
Merged
Conversation
Member
|
tks |
agapple
added a commit
that referenced
this pull request
Sep 13, 2024
Member
|
优化了下stop/start的并发控制,增加stop by latch.wait()的动作,确保正常退出 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
问题现象:
kafka已经配置了max.in.flight.requests.per.connection = 1但仍发现canal deploy往kafka发送消息的时候会存在乱序现象,发生乱序的时候存在如下关键日志信息:

instance日志:
zk日志:

问题分析:
每个instance都会通过监听各自在zk的临时数据节点runningData来实现HA机制,当出现极端的网络波动或者假死比如频繁full gc等,就会导致zk和客户端的会话无法正常续期,从而出现会话超时导致临时节点删除;当故障恢复此时客户端会触发没有改数据节点事件,改事件会触发ServerRunningMonitor.initRunning


initRunning如果抢占runningData成功则会调用CanalMQStarter.startDestination,startDestination的逻辑是先stop再start,由于stop只是设置一个标识位,只有下次轮询才会退出,这时候立马start一个线程就会出现多线程进行get,commit,rollback,存在乱序,丢失数据的可能
同个instance出现多线程生产者问题案例:
乱序场景:
send的时候出现并行发送但是ack的时候确是有序进行,此时不会报错,依然满足按最小batchId进行ack
无法正常commit和rollback场景:
is not the firstly可能会导致无法ack进而触发事件buffer积压从而无法消费;无法rollback进而丢失数据
解决思路:
抢占成功以后如果mq生产者已经启动就没必要stop再start,跳过就行了。这里重启个人觉得没有啥特殊意义