Skip to content

client adapter同个instance配置多个group会共享CanalMsgConsumer,从而出现并发问题#5175

Merged
agapple merged 1 commit into
alibaba:masterfrom
lwd-coding:master
Jun 26, 2024
Merged

client adapter同个instance配置多个group会共享CanalMsgConsumer,从而出现并发问题#5175
agapple merged 1 commit into
alibaba:masterfrom
lwd-coding:master

Conversation

@lwd-coding

Copy link
Copy Markdown
Contributor

问题现象:
kafka模式下同个instance配置多个group,启动必然会出现:KafkaConsumer is not safe for multi-threaded access Error sync and rollback的问题
image
原因:
每个group都会对应一个AdapterProcessor,AdapterProcessor初始化的时候会通过loader获取CanalMsgConsumer,CanalMsgConsumer实例会被ExtensionLoader缓存在EXTENSION_KEY_INSTANCE,key是name + "-" + key;而这里的key外部只会传入destination,所以导致同个instance多个group都会复用同一个CanalMsgConsumer,在kafka模式下就会出现多线程访问同一个kafkacomsumer的问题
解决方法:
传入destination + "-" + groupId作为key

@agapple

agapple commented Jun 26, 2024

Copy link
Copy Markdown
Member
  1. AdapterProcessor里的getExtension存在复用问题,导致consumer出现复用
  2. 每个consumer会设置topic和groupId,不会出现版本升级后offest丢失

@agapple agapple merged commit 9493ff7 into alibaba:master Jun 26, 2024
@agapple

agapple commented Jun 26, 2024

Copy link
Copy Markdown
Member

tks

agapple added a commit that referenced this pull request Jul 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants