Skip to content

Commit 8713a5b

Browse files
author
Mate Szalay-Beko
committed
ZOOKEEPER-3188: add fixes for PR comments
1 parent 05eae83 commit 8713a5b

3 files changed

Lines changed: 58 additions & 67 deletions

File tree

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ public void run() {
452452
LOG.error("Interrupted while sleeping. Ignoring exception", ie);
453453
} finally {
454454
closeSockets();
455+
executor.shutdownNow();
455456
}
456457
}
457458
}
@@ -522,7 +523,7 @@ private void acceptConnections() throws IOException {
522523
try {
523524
socket.close();
524525
} catch (IOException e) {
525-
LOG.warn("Error closing socket", e);
526+
LOG.warn("Error closing socket: " + socket, e);
526527
}
527528
}
528529
}

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,11 +257,8 @@ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) thr
257257
* @param addr - the address of the Peer to connect to.
258258
* @throws IOException - if the socket connection fails on the 5th attempt
259259
* if there is an authentication failure while connecting to leader
260-
* @throws X509Exception
261-
* @throws InterruptedException
262260
*/
263-
protected void connectToLeader(MultipleAddresses addr, String hostname)
264-
throws IOException, InterruptedException {
261+
protected void connectToLeader(MultipleAddresses addr, String hostname) throws IOException {
265262

266263
this.leaderAddr = addr;
267264
Set<InetSocketAddress> addresses = addr.getAllAddresses();
@@ -270,7 +267,13 @@ protected void connectToLeader(MultipleAddresses addr, String hostname)
270267
AtomicReference<Socket> socket = new AtomicReference<>(null);
271268
addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);
272269

273-
latch.await();
270+
try {
271+
latch.await();
272+
} catch (InterruptedException e) {
273+
LOG.warn("Interrupted while trying to connect to Leader", e);
274+
} finally {
275+
executor.shutdownNow();
276+
}
274277

275278
if (socket.get() == null) {
276279
throw new IOException("Failed connect to " + addr);

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java

Lines changed: 48 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -18,70 +18,61 @@
1818

1919
package org.apache.zookeeper.server.quorum;
2020

21+
import static java.util.Arrays.asList;
2122
import java.io.IOException;
2223
import java.net.InetAddress;
2324
import java.net.InetSocketAddress;
2425
import java.net.NoRouteToHostException;
2526
import java.net.UnknownHostException;
27+
import java.time.Duration;
28+
import java.util.Collection;
2629
import java.util.Collections;
2730
import java.util.List;
31+
import java.util.NoSuchElementException;
2832
import java.util.Objects;
2933
import java.util.Set;
3034
import java.util.concurrent.ConcurrentHashMap;
31-
import java.util.concurrent.atomic.AtomicReference;
3235
import java.util.stream.Collectors;
33-
import java.util.stream.Stream;
3436

3537
/**
3638
* This class allows to store several quorum and electing addresses.
3739
*
3840
* See ZOOKEEPER-3188 for a discussion of this feature.
3941
*/
40-
public class MultipleAddresses {
41-
private static final int DEFAULT_TIMEOUT = 100;
42+
public final class MultipleAddresses {
43+
private static final Duration DEFAULT_TIMEOUT = Duration.ofMillis(500);
44+
45+
private static Set<InetSocketAddress> newConcurrentHashSet() {
46+
return Collections.newSetFromMap(new ConcurrentHashMap<>());
47+
}
4248

4349
private Set<InetSocketAddress> addresses;
44-
private int timeout;
50+
private final Duration timeout;
4551

4652
public MultipleAddresses() {
47-
addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
48-
timeout = DEFAULT_TIMEOUT;
53+
this(Collections.emptyList());
4954
}
5055

51-
public MultipleAddresses(List<InetSocketAddress> addresses) {
56+
public MultipleAddresses(Collection<InetSocketAddress> addresses) {
5257
this(addresses, DEFAULT_TIMEOUT);
5358
}
5459

5560
public MultipleAddresses(InetSocketAddress address) {
56-
this(address, DEFAULT_TIMEOUT);
61+
this(asList(address), DEFAULT_TIMEOUT);
5762
}
5863

59-
public MultipleAddresses(List<InetSocketAddress> addresses, int timeout) {
60-
this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
64+
public MultipleAddresses(Collection<InetSocketAddress> addresses, Duration timeout) {
65+
this.addresses = newConcurrentHashSet();
6166
this.addresses.addAll(addresses);
6267
this.timeout = timeout;
6368
}
6469

65-
public MultipleAddresses(InetSocketAddress address, int timeout) {
66-
addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
67-
addresses.add(address);
68-
this.timeout = timeout;
69-
}
70-
71-
public int getTimeout() {
72-
return timeout;
73-
}
74-
75-
public void setTimeout(int timeout) {
76-
this.timeout = timeout;
77-
}
78-
7970
public boolean isEmpty() {
8071
return addresses.isEmpty();
8172
}
8273

8374
/**
84-
* Returns all addresses.
75+
* Returns all addresses in an unmodifiable set.
8576
*
8677
* @return set of all InetSocketAddress
8778
*/
@@ -121,26 +112,29 @@ public void addAddress(InetSocketAddress address) {
121112
}
122113

123114
/**
124-
* Returns reachable address. If none is reachable than throws exception.
115+
* Returns a reachable address. If none is reachable than throws exception.
116+
* The function is nondeterministic in the sense that the result of calling this function
117+
* twice with the same set of reachable addresses might lead to different results.
125118
*
126119
* @return address which is reachable.
127-
* @throws NoRouteToHostException if none address is reachable
120+
* @throws NoRouteToHostException if none of the addresses are reachable
128121
*/
129122
public InetSocketAddress getReachableAddress() throws NoRouteToHostException {
130-
AtomicReference<InetSocketAddress> address = new AtomicReference<>(null);
131-
getInetSocketAddressStream().forEach(addr -> checkIfAddressIsReachableAndSet(addr, address));
132-
133-
if (address.get() != null) {
134-
return address.get();
135-
} else {
136-
throw new NoRouteToHostException("No valid address among " + addresses);
137-
}
123+
// using parallelStream() + findAny() will help to minimize the time spent, but
124+
return addresses.parallelStream()
125+
.filter(this::checkIfAddressIsReachable)
126+
.findAny()
127+
.orElseThrow(() -> new NoRouteToHostException("No valid address among " + addresses));
138128
}
139129

140130
/**
141-
* Returns reachable address or first one, if none is reachable.
131+
* Returns a reachable address or an arbitrary one, if none is reachable. It throws an exception
132+
* if there are no addresses registered. The function is nondeterministic in the sense that the
133+
* result of calling this function twice with the same set of reachable addresses might lead
134+
* to different results.
142135
*
143136
* @return address which is reachable or fist one.
137+
* @throws NoSuchElementException if there is no address registered
144138
*/
145139
public InetSocketAddress getReachableOrOne() {
146140
InetSocketAddress address;
@@ -153,37 +147,38 @@ public InetSocketAddress getReachableOrOne() {
153147
}
154148

155149
/**
156-
* Performs a DNS lookup for addresses.
150+
* Performs a parallel DNS lookup for all addresses.
157151
*
158-
* If the DNS lookup fails, than address remain unmodified.
152+
* If the DNS lookup fails, then address remain unmodified.
159153
*/
160154
public void recreateSocketAddresses() {
161-
Set<InetSocketAddress> temp = Collections.newSetFromMap(new ConcurrentHashMap<>());
162-
temp.addAll(getInetSocketAddressStream().map(this::recreateSocketAddress).collect(Collectors.toSet()));
163-
addresses = temp;
155+
addresses = addresses.parallelStream()
156+
.map(this::recreateSocketAddress)
157+
.collect(Collectors.toCollection(MultipleAddresses::newConcurrentHashSet));
164158
}
165159

166160
/**
167-
* Returns first address from set.
161+
* Returns an address from the set.
168162
*
169163
* @return address from a set.
164+
* @throws NoSuchElementException if there is no address registered
170165
*/
171166
public InetSocketAddress getOne() {
172167
return addresses.iterator().next();
173168
}
174169

175-
private void checkIfAddressIsReachableAndSet(InetSocketAddress address,
176-
AtomicReference<InetSocketAddress> reachableAddress) {
177-
for (int i = 0; i < 5 && reachableAddress.get() == null; i++) {
178-
try {
179-
if (address.getAddress().isReachable((i + 1) * timeout)) {
180-
reachableAddress.compareAndSet(null, address);
181-
break;
182-
}
183-
Thread.sleep(timeout);
184-
} catch (NullPointerException | IOException | InterruptedException ignored) {
170+
private boolean checkIfAddressIsReachable(InetSocketAddress address) {
171+
if (address.isUnresolved()) {
172+
return false;
173+
}
174+
try {
175+
if (address.getAddress().isReachable((int) timeout.toMillis())) {
176+
return true;
185177
}
178+
} catch (IOException ignored) {
179+
// ignore, we don't really care if we can't reach it for timeout or for IO problems
186180
}
181+
return false;
187182
}
188183

189184
private InetSocketAddress recreateSocketAddress(InetSocketAddress address) {
@@ -194,14 +189,6 @@ private InetSocketAddress recreateSocketAddress(InetSocketAddress address) {
194189
}
195190
}
196191

197-
private Stream<InetSocketAddress> getInetSocketAddressStream() {
198-
if (addresses.size() > 1) {
199-
return addresses.parallelStream();
200-
} else {
201-
return addresses.stream();
202-
}
203-
}
204-
205192
@Override
206193
public boolean equals(Object o) {
207194
if (this == o) {

0 commit comments

Comments
 (0)