|
38 | 38 | import java.util.ArrayList; |
39 | 39 | import java.util.Arrays; |
40 | 40 | import java.util.Collection; |
| 41 | +import java.util.Collections; |
41 | 42 | import java.util.HashMap; |
42 | 43 | import java.util.LinkedHashSet; |
43 | 44 | import java.util.LinkedList; |
44 | 45 | import java.util.List; |
45 | 46 | import java.util.Map; |
46 | 47 | import java.util.Set; |
| 48 | +import java.util.concurrent.ConcurrentHashMap; |
| 49 | +import java.util.concurrent.ConcurrentMap; |
47 | 50 | import java.util.concurrent.Executors; |
48 | 51 | import java.util.concurrent.ScheduledExecutorService; |
49 | 52 | import java.util.concurrent.TimeUnit; |
50 | | -import java.util.concurrent.ConcurrentMap; |
51 | | -import java.util.concurrent.ConcurrentHashMap; |
52 | 53 | import java.util.stream.Collectors; |
53 | | -import java.util.Collections; |
54 | 54 |
|
55 | 55 | import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE; |
56 | 56 | import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; |
@@ -123,7 +123,7 @@ public class NacosRegistry extends FailbackRegistry { |
123 | 123 |
|
124 | 124 | private final NacosNamingServiceWrapper namingService; |
125 | 125 |
|
126 | | - private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, EventListener>> nacosListeners = new ConcurrentHashMap<>(); |
| 126 | + private final ConcurrentMap<URL, Map<NotifyListener, Map<String, EventListener>>> nacosListeners = new ConcurrentHashMap<>(); |
127 | 127 |
|
128 | 128 | public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) { |
129 | 129 | super(url); |
@@ -242,8 +242,7 @@ private boolean isServiceNamesWithCompatibleMode(final URL url) { |
242 | 242 | public void doUnsubscribe(URL url, NotifyListener listener) { |
243 | 243 | if (isAdminProtocol(url)) { |
244 | 244 | shutdownServiceNamesLookup(); |
245 | | - } |
246 | | - else { |
| 245 | + } else { |
247 | 246 | Set<String> serviceNames = getServiceNames(url, listener); |
248 | 247 |
|
249 | 248 | doUnsubscribe(url, listener, serviceNames); |
@@ -305,7 +304,7 @@ private Set<String> filterServiceNames(NacosServiceName serviceName) { |
305 | 304 | Set<String> serviceNames = new LinkedHashSet<>(); |
306 | 305 |
|
307 | 306 | execute(namingService -> serviceNames.addAll(namingService.getServicesOfServer(1, Integer.MAX_VALUE, |
308 | | - getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData() |
| 307 | + getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData() |
309 | 308 | .stream() |
310 | 309 | .filter(this::isConformRules) |
311 | 310 | .map(NacosServiceName::new) |
@@ -511,40 +510,41 @@ private List<URL> buildURLs(URL consumerURL, Collection<Instance> instances) { |
511 | 510 |
|
512 | 511 | private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener) |
513 | 512 | throws NacosException { |
514 | | - ConcurrentMap<NotifyListener, EventListener> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); |
515 | | - EventListener nacosListener = listeners.computeIfAbsent(listener, k -> { |
516 | | - EventListener eventListener = event -> { |
517 | | - if (event instanceof NamingEvent) { |
518 | | - NamingEvent e = (NamingEvent) event; |
519 | | - List<Instance> instances = e.getInstances(); |
520 | | - |
521 | | - |
522 | | - if (isServiceNamesWithCompatibleMode(url)) { |
| 513 | + Map<NotifyListener, Map<String, EventListener>> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); |
| 514 | + Map<String, EventListener> eventListenerMap = listeners.computeIfAbsent(listener, k -> new ConcurrentHashMap<>()); |
| 515 | + EventListener nacosListener = eventListenerMap.computeIfAbsent(serviceName, |
| 516 | + name -> event -> { |
| 517 | + if (event instanceof NamingEvent) { |
| 518 | + NamingEvent e = (NamingEvent) event; |
| 519 | + List<Instance> instances = e.getInstances(); |
| 520 | + if (isServiceNamesWithCompatibleMode(url)) { |
| 521 | + |
| 522 | + // Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned |
| 523 | + // in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899 |
| 524 | + NacosInstanceManageUtil.initOrRefreshServiceInstanceList(name, instances); |
| 525 | + instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(name); |
| 526 | + } |
523 | 527 |
|
524 | | - // Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned |
525 | | - // in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899 |
526 | | - NacosInstanceManageUtil.initOrRefreshServiceInstanceList(e.getServiceName(), instances); |
527 | | - instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(e.getServiceName()); |
| 528 | + notifySubscriber(url, listener, instances); |
528 | 529 | } |
529 | | - |
530 | | - notifySubscriber(url, listener, instances); |
531 | | - } |
532 | | - }; |
533 | | - return eventListener; |
534 | | - }); |
| 530 | + }); |
535 | 531 | namingService.subscribe(serviceName, |
536 | 532 | getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP), |
537 | 533 | nacosListener); |
538 | 534 | } |
539 | 535 |
|
540 | 536 | private void unsubscribeEventListener(String serviceName, final URL url, final NotifyListener listener) |
541 | 537 | throws NacosException { |
542 | | - ConcurrentMap<NotifyListener, EventListener> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url); |
543 | | - if(notifyListenerEventListenerConcurrentMap == null){ |
| 538 | + Map<NotifyListener, Map<String, EventListener>> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url); |
| 539 | + if (notifyListenerEventListenerConcurrentMap == null) { |
| 540 | + return; |
| 541 | + } |
| 542 | + Map<String, EventListener> listenerMap = notifyListenerEventListenerConcurrentMap.get(listener); |
| 543 | + if (listenerMap == null) { |
544 | 544 | return; |
545 | 545 | } |
546 | | - EventListener nacosListener = notifyListenerEventListenerConcurrentMap.get(listener); |
547 | | - if(nacosListener == null){ |
| 546 | + EventListener nacosListener = listenerMap.remove(serviceName); |
| 547 | + if (nacosListener == null) { |
548 | 548 | return; |
549 | 549 | } |
550 | 550 | namingService.unsubscribe(serviceName, |
|
0 commit comments