1717
1818package org .apache .eventmesh .openconnect .offsetmgmt .nacos ;
1919
20- import org .apache .eventmesh .common .config .CommonConfiguration ;
21- import org .apache .eventmesh .common .utils .ConfigurationContextUtil ;
2220import org .apache .eventmesh .openconnect .offsetmgmt .api .config .OffsetStorageConfig ;
2321import org .apache .eventmesh .openconnect .offsetmgmt .api .data .RecordOffset ;
2422import org .apache .eventmesh .openconnect .offsetmgmt .api .storage .ConnectorRecordPartition ;
2523import org .apache .eventmesh .openconnect .offsetmgmt .api .storage .KeyValueStore ;
2624import org .apache .eventmesh .openconnect .offsetmgmt .api .storage .MemoryBasedKeyValueStore ;
2725import org .apache .eventmesh .openconnect .offsetmgmt .api .storage .OffsetManagementService ;
2826
29- import org .apache .commons .lang3 .StringUtils ;
30-
31- import java .util .ArrayList ;
32- import java .util .Collections ;
3327import java .util .List ;
3428import java .util .Map ;
35- import java .util .Objects ;
36- import java .util .Properties ;
37- import java .util .concurrent .ConcurrentHashMap ;
38- import java .util .concurrent .ConcurrentMap ;
3929import java .util .concurrent .Executor ;
40- import java .util .concurrent .atomic .AtomicBoolean ;
41-
42- import org .checkerframework .checker .units .qual .C ;
4330
4431import com .alibaba .nacos .api .NacosFactory ;
4532import com .alibaba .nacos .api .config .ConfigService ;
46- import com .alibaba .nacos .api .config .ConfigType ;
4733import com .alibaba .nacos .api .config .listener .Listener ;
4834import com .alibaba .nacos .api .exception .NacosException ;
49- import com .alibaba .nacos .api .naming .NamingService ;
50- import com .alibaba .nacos .api .naming .pojo .Instance ;
51- import com .alibaba .nacos .client .naming .NacosNamingService ;
52- import com .alibaba .nacos .common .utils .CollectionUtils ;
5335import com .alibaba .nacos .common .utils .JacksonUtils ;
5436import com .fasterxml .jackson .core .type .TypeReference ;
5537
5638import lombok .Getter ;
57- import lombok .Setter ;
5839import lombok .extern .slf4j .Slf4j ;
5940
6041@ Slf4j
@@ -77,25 +58,6 @@ public class NacosConfigService implements OffsetManagementService {
7758
7859 @ Override
7960 public void start () {
80- listener = new Listener () {
81- @ Override
82- public Executor getExecutor () {
83- return null ;
84- }
85-
86- @ Override
87- public void receiveConfigInfo (String configInfo ) {
88- log .info ("receive configInfo: {}" , configInfo );
89- Map <ConnectorRecordPartition , RecordOffset > partitionRecordOffsetMap = JacksonUtils .toObj (configInfo ,
90- new TypeReference <Map <ConnectorRecordPartition , RecordOffset >>() {
91- });
92- // 整理configInfo 并更新内存中的offset
93- partitionRecordOffsetMap .forEach (
94- (connectorRecordPartition , recordOffset ) -> mergeOffset (connectorRecordPartition , recordOffset )
95- );
96- }
97- };
98-
9961 try {
10062 configService .addListener (dataId , group , listener );
10163 } catch (NacosException e ) {
@@ -189,20 +151,27 @@ public void initialize(OffsetStorageConfig config) {
189151 this .positionStore = new MemoryBasedKeyValueStore <>();
190152 try {
191153 configService = NacosFactory .createConfigService (serverAddr );
192- listener = new Listener () {
193- @ Override
194- public Executor getExecutor () {
195- return null ;
196- }
197-
198- @ Override
199- public void receiveConfigInfo (String configInfo ) {
200- System .out .println ("receive changed config: " + configInfo );
201- }
202- };
203154 } catch (NacosException e ) {
204155 log .error ("nacos init error" , e );
205156 }
157+ this .listener = new Listener () {
158+ @ Override
159+ public Executor getExecutor () {
160+ return null ;
161+ }
162+
163+ @ Override
164+ public void receiveConfigInfo (String configInfo ) {
165+ log .info ("receive configInfo: {}" , configInfo );
166+ Map <ConnectorRecordPartition , RecordOffset > partitionRecordOffsetMap = JacksonUtils .toObj (configInfo ,
167+ new TypeReference <Map <ConnectorRecordPartition , RecordOffset >>() {
168+ });
169+ // update the offset in memory store
170+ partitionRecordOffsetMap .forEach (
171+ (connectorRecordPartition , recordOffset ) -> mergeOffset (connectorRecordPartition , recordOffset )
172+ );
173+ }
174+ };
206175
207176 }
208177
0 commit comments