Skip to content

Commit 7bfbe7e

Browse files
committed
ZOOKEEPER-3188: Improve resilience to network
1 parent abf1865 commit 7bfbe7e

22 files changed

Lines changed: 1090 additions & 465 deletions

zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.zookeeper.server.quorum.Observer;
2222
import org.apache.zookeeper.server.quorum.ObserverMXBean;
2323
import org.apache.zookeeper.server.quorum.QuorumPeer;
24+
import java.net.InetSocketAddress;
2425

2526
/**
2627
* ObserverBean
@@ -49,10 +50,11 @@ public String getQuorumAddress() {
4950

5051
public String getLearnerMaster() {
5152
QuorumPeer.QuorumServer learnerMaster = observer.getCurrentLearnerMaster();
52-
if (learnerMaster == null || learnerMaster.addr == null) {
53+
InetSocketAddress address = learnerMaster.addr.getReachableOrOne();
54+
if (learnerMaster == null || address == null) {
5355
return "Unknown";
5456
}
55-
return learnerMaster.addr.getAddress().getHostAddress() + ":" + learnerMaster.addr.getPort();
57+
return address.getAddress().getHostAddress() + ":" + address.getPort();
5658
}
5759

5860
public void setLearnerMaster(String learnerMaster) {

zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,8 @@
1818

1919
package org.apache.zookeeper.server.admin;
2020

21-
import java.util.Arrays;
22-
import java.util.Collections;
23-
import java.util.HashMap;
24-
import java.util.HashSet;
25-
import java.util.List;
26-
import java.util.Map;
27-
import java.util.Properties;
28-
import java.util.Set;
29-
import java.util.SortedMap;
30-
import java.util.TreeMap;
21+
import java.net.InetSocketAddress;
22+
import java.util.*;
3123
import java.util.stream.Collectors;
3224

3325
import com.fasterxml.jackson.annotation.JsonAnyGetter;
@@ -644,10 +636,8 @@ private static class VotingView {
644636
this.view = view.entrySet().stream()
645637
.filter(e -> e.getValue().addr != null)
646638
.collect(Collectors.toMap(Map.Entry::getKey,
647-
e -> String.format("%s:%d%s:%s%s",
648-
QuorumPeer.QuorumServer.delimitedHostString(e.getValue().addr),
649-
e.getValue().addr.getPort(),
650-
e.getValue().electionAddr == null ? "" : ":" + e.getValue().electionAddr.getPort(),
639+
e -> String.format("%s:%s%s",
640+
getMultiAddressString(e.getValue()),
651641
e.getValue().type.equals(QuorumPeer.LearnerType.PARTICIPANT) ? "participant" : "observer",
652642
e.getValue().clientAddr ==null || e.getValue().isClientAddrFromStatic ? "" :
653643
String.format(";%s:%d",
@@ -657,11 +647,31 @@ private static class VotingView {
657647
TreeMap::new));
658648
}
659649

650+
private String getMultiAddressString(QuorumPeer.QuorumServer qs) {
651+
return qs.addr.getAllAddresses().stream()
652+
.map(address -> getSingleAddressString(qs, address))
653+
.collect(Collectors.joining(","));
654+
}
655+
656+
private String getSingleAddressString(QuorumPeer.QuorumServer qs, InetSocketAddress address) {
657+
final String addressHostString = address.getHostString();
658+
final String delimitedHostString = QuorumPeer.QuorumServer.delimitedHostString(address);
659+
660+
Optional<InetSocketAddress> matchingElectionAddress = qs.electionAddr.getAllAddresses().stream()
661+
.filter(electionAddress -> electionAddress.getHostString().equals(addressHostString))
662+
.findFirst();
663+
final String electionPort = matchingElectionAddress.map(e-> ":" + e.getPort()).orElse("");
664+
665+
return String.format("%s:%d%s", delimitedHostString, address.getPort(), electionPort);
666+
}
667+
660668
@JsonAnyGetter
661669
public Map<Long, String> getView() {
662670
return view;
663671
}
664672
}
673+
674+
665675
}
666676

667677
/**

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.IOException;
2323
import java.net.DatagramPacket;
2424
import java.net.DatagramSocket;
25+
import java.net.InetAddress;
2526
import java.net.InetSocketAddress;
2627
import java.net.SocketException;
2728
import java.nio.ByteBuffer;
@@ -44,8 +45,6 @@
4445

4546
import org.apache.zookeeper.jmx.MBeanRegistry;
4647
import org.apache.zookeeper.server.ZooKeeperThread;
47-
import org.apache.zookeeper.server.quorum.Election;
48-
import org.apache.zookeeper.server.quorum.Vote;
4948
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
5049
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
5150

@@ -732,8 +731,8 @@ private void process(ToSend m) {
732731
}
733732

734733
for (QuorumServer server : self.getVotingView().values()) {
735-
InetSocketAddress saddr = new InetSocketAddress(server.addr
736-
.getAddress(), port);
734+
InetAddress address = server.addr.getReachableOrOne().getAddress();
735+
InetSocketAddress saddr = new InetSocketAddress(address, port);
737736
addrChallengeMap.put(saddr, new ConcurrentHashMap<Long, Long>());
738737
}
739738

@@ -763,7 +762,7 @@ public AuthFastLeaderElection(QuorumPeer self) {
763762

764763
private void starter(QuorumPeer self) {
765764
this.self = self;
766-
port = self.getVotingView().get(self.getId()).electionAddr.getPort();
765+
port = self.getVotingView().get(self.getId()).electionAddr.getAllPorts().get(0);
767766
proposedLeader = -1;
768767
proposedZxid = -1;
769768

@@ -786,11 +785,10 @@ private void leaveInstance() {
786785

787786
private void sendNotifications() {
788787
for (QuorumServer server : self.getView().values()) {
789-
788+
InetSocketAddress address = self.getView().get(server.id).electionAddr.getReachableOrOne();
790789
ToSend notmsg = new ToSend(ToSend.mType.notification,
791790
AuthFastLeaderElection.sequencer++, proposedLeader,
792-
proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING,
793-
self.getView().get(server.id).electionAddr);
791+
proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, address);
794792

795793
sendqueue.offer(notmsg);
796794
}

0 commit comments

Comments
 (0)