Skip to content

Commit 9621900

Browse files
committed
fix ci check error
1 parent 4eddf36 commit 9621900

File tree

1 file changed

+18
-49
lines changed
  • eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-nacos/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/nacos

1 file changed

+18
-49
lines changed

eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-nacos/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/nacos/NacosConfigService.java

Lines changed: 18 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,44 +17,25 @@
1717

1818
package org.apache.eventmesh.openconnect.offsetmgmt.nacos;
1919

20-
import org.apache.eventmesh.common.config.CommonConfiguration;
21-
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
2220
import org.apache.eventmesh.openconnect.offsetmgmt.api.config.OffsetStorageConfig;
2321
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
2422
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.ConnectorRecordPartition;
2523
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.KeyValueStore;
2624
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.MemoryBasedKeyValueStore;
2725
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService;
2826

29-
import org.apache.commons.lang3.StringUtils;
30-
31-
import java.util.ArrayList;
32-
import java.util.Collections;
3327
import java.util.List;
3428
import java.util.Map;
35-
import java.util.Objects;
36-
import java.util.Properties;
37-
import java.util.concurrent.ConcurrentHashMap;
38-
import java.util.concurrent.ConcurrentMap;
3929
import java.util.concurrent.Executor;
40-
import java.util.concurrent.atomic.AtomicBoolean;
41-
42-
import org.checkerframework.checker.units.qual.C;
4330

4431
import com.alibaba.nacos.api.NacosFactory;
4532
import com.alibaba.nacos.api.config.ConfigService;
46-
import com.alibaba.nacos.api.config.ConfigType;
4733
import com.alibaba.nacos.api.config.listener.Listener;
4834
import com.alibaba.nacos.api.exception.NacosException;
49-
import com.alibaba.nacos.api.naming.NamingService;
50-
import com.alibaba.nacos.api.naming.pojo.Instance;
51-
import com.alibaba.nacos.client.naming.NacosNamingService;
52-
import com.alibaba.nacos.common.utils.CollectionUtils;
5335
import com.alibaba.nacos.common.utils.JacksonUtils;
5436
import com.fasterxml.jackson.core.type.TypeReference;
5537

5638
import lombok.Getter;
57-
import lombok.Setter;
5839
import lombok.extern.slf4j.Slf4j;
5940

6041
@Slf4j
@@ -77,25 +58,6 @@ public class NacosConfigService implements OffsetManagementService {
7758

7859
@Override
7960
public void start() {
80-
listener = new Listener() {
81-
@Override
82-
public Executor getExecutor() {
83-
return null;
84-
}
85-
86-
@Override
87-
public void receiveConfigInfo(String configInfo) {
88-
log.info("receive configInfo: {}", configInfo);
89-
Map<ConnectorRecordPartition, RecordOffset> partitionRecordOffsetMap = JacksonUtils.toObj(configInfo,
90-
new TypeReference<Map<ConnectorRecordPartition, RecordOffset>>() {
91-
});
92-
// 整理configInfo 并更新内存中的offset
93-
partitionRecordOffsetMap.forEach(
94-
(connectorRecordPartition, recordOffset) -> mergeOffset(connectorRecordPartition, recordOffset)
95-
);
96-
}
97-
};
98-
9961
try {
10062
configService.addListener(dataId, group, listener);
10163
} catch (NacosException e) {
@@ -189,20 +151,27 @@ public void initialize(OffsetStorageConfig config) {
189151
this.positionStore = new MemoryBasedKeyValueStore<>();
190152
try {
191153
configService = NacosFactory.createConfigService(serverAddr);
192-
listener = new Listener() {
193-
@Override
194-
public Executor getExecutor() {
195-
return null;
196-
}
197-
198-
@Override
199-
public void receiveConfigInfo(String configInfo) {
200-
System.out.println("receive changed config: " + configInfo);
201-
}
202-
};
203154
} catch (NacosException e) {
204155
log.error("nacos init error", e);
205156
}
157+
this.listener = new Listener() {
158+
@Override
159+
public Executor getExecutor() {
160+
return null;
161+
}
162+
163+
@Override
164+
public void receiveConfigInfo(String configInfo) {
165+
log.info("receive configInfo: {}", configInfo);
166+
Map<ConnectorRecordPartition, RecordOffset> partitionRecordOffsetMap = JacksonUtils.toObj(configInfo,
167+
new TypeReference<Map<ConnectorRecordPartition, RecordOffset>>() {
168+
});
169+
// update the offset in memory store
170+
partitionRecordOffsetMap.forEach(
171+
(connectorRecordPartition, recordOffset) -> mergeOffset(connectorRecordPartition, recordOffset)
172+
);
173+
}
174+
};
206175

207176
}
208177

0 commit comments

Comments
 (0)