1818
1919package org .apache .zookeeper .server .quorum ;
2020
21+ import static java .util .Arrays .asList ;
2122import java .io .IOException ;
2223import java .net .InetAddress ;
2324import java .net .InetSocketAddress ;
2425import java .net .NoRouteToHostException ;
2526import java .net .UnknownHostException ;
27+ import java .time .Duration ;
28+ import java .util .Collection ;
2629import java .util .Collections ;
2730import java .util .List ;
31+ import java .util .NoSuchElementException ;
2832import java .util .Objects ;
2933import java .util .Set ;
3034import java .util .concurrent .ConcurrentHashMap ;
31- import java .util .concurrent .atomic .AtomicReference ;
3235import java .util .stream .Collectors ;
33- import java .util .stream .Stream ;
3436
3537/**
3638 * This class allows to store several quorum and electing addresses.
3739 *
3840 * See ZOOKEEPER-3188 for a discussion of this feature.
3941 */
40- public class MultipleAddresses {
41- private static final int DEFAULT_TIMEOUT = 100 ;
42+ public final class MultipleAddresses {
43+ private static final Duration DEFAULT_TIMEOUT = Duration .ofMillis (500 );
44+
45+ private static Set <InetSocketAddress > newConcurrentHashSet () {
46+ return Collections .newSetFromMap (new ConcurrentHashMap <>());
47+ }
4248
4349 private Set <InetSocketAddress > addresses ;
44- private int timeout ;
50+ private final Duration timeout ;
4551
4652 public MultipleAddresses () {
47- addresses = Collections .newSetFromMap (new ConcurrentHashMap <>());
48- timeout = DEFAULT_TIMEOUT ;
53+ this (Collections .emptyList ());
4954 }
5055
51- public MultipleAddresses (List <InetSocketAddress > addresses ) {
56+ public MultipleAddresses (Collection <InetSocketAddress > addresses ) {
5257 this (addresses , DEFAULT_TIMEOUT );
5358 }
5459
5560 public MultipleAddresses (InetSocketAddress address ) {
56- this (address , DEFAULT_TIMEOUT );
61+ this (asList ( address ) , DEFAULT_TIMEOUT );
5762 }
5863
59- public MultipleAddresses (List <InetSocketAddress > addresses , int timeout ) {
60- this .addresses = Collections . newSetFromMap ( new ConcurrentHashMap <>() );
64+ public MultipleAddresses (Collection <InetSocketAddress > addresses , Duration timeout ) {
65+ this .addresses = newConcurrentHashSet ( );
6166 this .addresses .addAll (addresses );
6267 this .timeout = timeout ;
6368 }
6469
65- public MultipleAddresses (InetSocketAddress address , int timeout ) {
66- addresses = Collections .newSetFromMap (new ConcurrentHashMap <>());
67- addresses .add (address );
68- this .timeout = timeout ;
69- }
70-
71- public int getTimeout () {
72- return timeout ;
73- }
74-
75- public void setTimeout (int timeout ) {
76- this .timeout = timeout ;
77- }
78-
7970 public boolean isEmpty () {
8071 return addresses .isEmpty ();
8172 }
8273
8374 /**
84- * Returns all addresses.
75+ * Returns all addresses in an unmodifiable set .
8576 *
8677 * @return set of all InetSocketAddress
8778 */
@@ -121,26 +112,29 @@ public void addAddress(InetSocketAddress address) {
121112 }
122113
123114 /**
124- * Returns reachable address. If none is reachable than throws exception.
115+ * Returns a reachable address. If none is reachable than throws exception.
116+ * The function is nondeterministic in the sense that the result of calling this function
117+ * twice with the same set of reachable addresses might lead to different results.
125118 *
126119 * @return address which is reachable.
127- * @throws NoRouteToHostException if none address is reachable
120+ * @throws NoRouteToHostException if none of the addresses are reachable
128121 */
129122 public InetSocketAddress getReachableAddress () throws NoRouteToHostException {
130- AtomicReference <InetSocketAddress > address = new AtomicReference <>(null );
131- getInetSocketAddressStream ().forEach (addr -> checkIfAddressIsReachableAndSet (addr , address ));
132-
133- if (address .get () != null ) {
134- return address .get ();
135- } else {
136- throw new NoRouteToHostException ("No valid address among " + addresses );
137- }
123+ // using parallelStream() + findAny() will help to minimize the time spent, but
124+ return addresses .parallelStream ()
125+ .filter (this ::checkIfAddressIsReachable )
126+ .findAny ()
127+ .orElseThrow (() -> new NoRouteToHostException ("No valid address among " + addresses ));
138128 }
139129
140130 /**
141- * Returns reachable address or first one, if none is reachable.
131+ * Returns a reachable address or an arbitrary one, if none is reachable. It throws an exception
132+ * if there are no addresses registered. The function is nondeterministic in the sense that the
133+ * result of calling this function twice with the same set of reachable addresses might lead
134+ * to different results.
142135 *
143136 * @return address which is reachable or fist one.
137+ * @throws NoSuchElementException if there is no address registered
144138 */
145139 public InetSocketAddress getReachableOrOne () {
146140 InetSocketAddress address ;
@@ -153,37 +147,38 @@ public InetSocketAddress getReachableOrOne() {
153147 }
154148
155149 /**
156- * Performs a DNS lookup for addresses.
150+ * Performs a parallel DNS lookup for all addresses.
157151 *
158- * If the DNS lookup fails, than address remain unmodified.
152+ * If the DNS lookup fails, then address remain unmodified.
159153 */
160154 public void recreateSocketAddresses () {
161- Set < InetSocketAddress > temp = Collections . newSetFromMap ( new ConcurrentHashMap <>());
162- temp . addAll ( getInetSocketAddressStream (). map (this ::recreateSocketAddress ). collect ( Collectors . toSet ()));
163- addresses = temp ;
155+ addresses = addresses . parallelStream ()
156+ . map (this ::recreateSocketAddress )
157+ . collect ( Collectors . toCollection ( MultipleAddresses :: newConcurrentHashSet )) ;
164158 }
165159
166160 /**
167- * Returns first address from set.
161+ * Returns an address from the set.
168162 *
169163 * @return address from a set.
164+ * @throws NoSuchElementException if there is no address registered
170165 */
171166 public InetSocketAddress getOne () {
172167 return addresses .iterator ().next ();
173168 }
174169
175- private void checkIfAddressIsReachableAndSet (InetSocketAddress address ,
176- AtomicReference <InetSocketAddress > reachableAddress ) {
177- for (int i = 0 ; i < 5 && reachableAddress .get () == null ; i ++) {
178- try {
179- if (address .getAddress ().isReachable ((i + 1 ) * timeout )) {
180- reachableAddress .compareAndSet (null , address );
181- break ;
182- }
183- Thread .sleep (timeout );
184- } catch (NullPointerException | IOException | InterruptedException ignored ) {
170+ private boolean checkIfAddressIsReachable (InetSocketAddress address ) {
171+ if (address .isUnresolved ()) {
172+ return false ;
173+ }
174+ try {
175+ if (address .getAddress ().isReachable ((int ) timeout .toMillis ())) {
176+ return true ;
185177 }
178+ } catch (IOException ignored ) {
179+ // ignore, we don't really care if we can't reach it for timeout or for IO problems
186180 }
181+ return false ;
187182 }
188183
189184 private InetSocketAddress recreateSocketAddress (InetSocketAddress address ) {
@@ -194,14 +189,6 @@ private InetSocketAddress recreateSocketAddress(InetSocketAddress address) {
194189 }
195190 }
196191
197- private Stream <InetSocketAddress > getInetSocketAddressStream () {
198- if (addresses .size () > 1 ) {
199- return addresses .parallelStream ();
200- } else {
201- return addresses .stream ();
202- }
203- }
204-
205192 @ Override
206193 public boolean equals (Object o ) {
207194 if (this == o ) {
0 commit comments