Skip to content

Commit e823af4

Browse files
committed
Merge remote-tracking branch 'origin/master' into ZOOKEEPER-3188
2 parents de7bad2 + b5399da commit e823af4

12 files changed

Lines changed: 670 additions & 4 deletions

File tree

bin/zkServer.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ stop)
212212
else
213213
$KILL $(cat "$ZOOPIDFILE")
214214
rm "$ZOOPIDFILE"
215+
sleep 1
215216
echo STOPPED
216217
fi
217218
exit 0

build.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
5555
<property name="javacc.version" value="5.0"/>
5656

5757
<property name="jetty.version" value="9.4.18.v20190429"/>
58-
<property name="jackson.version" value="2.9.9.1"/>
58+
<property name="jackson.version" value="2.9.9.3"/>
5959
<property name="dependency-check-ant.version" value="4.0.2"/>
6060

6161
<property name="commons-io.version" value="2.6"/>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@
280280
<commons-cli.version>1.2</commons-cli.version>
281281
<netty.version>4.1.36.Final</netty.version>
282282
<jetty.version>9.4.18.v20190429</jetty.version>
283-
<jackson.version>2.9.9.1</jackson.version>
283+
<jackson.version>2.9.9.3</jackson.version>
284284
<json.version>1.1.1</json.version>
285285
<jline.version>2.11</jline.version>
286286
<snappy.version>1.1.7</snappy.version>

zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,6 +1494,26 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
14941494
minute. This prevents herding during container deletion.
14951495
Default is "10000".
14961496

1497+
<a name="sc_debug_observability_config"></a>
1498+
1499+
#### Debug Observability Configurations
1500+
1501+
**New in 3.6.0:** The following options are introduced to make zookeeper easier to debug.
1502+
1503+
* *zookeeper.messageTracker.BufferSize* :
1504+
(Java system property only)
1505+
Controls the maximum number of messages stored in **MessageTracker**. Value should be positive
1506+
integers. The default value is 10. **MessageTracker** is introduced in **3.6.0** to record the
1507+
last set of messages between a server (follower or observer) and a leader, when a server
1508+
disconnects with leader. These set of messages will then be dumped to zookeeper's log file,
1509+
and will help reconstruct the state of the servers at the time of the disconnection and
1510+
will be useful for debugging purpose.
1511+
1512+
* *zookeeper.messageTracker.Enabled* :
1513+
(Java system property only)
1514+
When set to "true", will enable **MessageTracker** to track and record messages. Default value
1515+
is "false".
1516+
14971517
<a name="sc_adminserver_config"></a>
14981518

14991519
#### AdminServer configuration

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,16 @@ void followLeader() throws InterruptedException {
7474
self.start_fle = 0;
7575
self.end_fle = 0;
7676
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
77+
78+
long connectionTime = 0;
79+
boolean completedSync = false;
80+
7781
try {
7882
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
7983
QuorumServer leaderServer = findLeader();
8084
try {
8185
connectToLeader(leaderServer.addr, leaderServer.hostname);
86+
connectionTime = System.currentTimeMillis();
8287
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
8388
if (self.isReconfigStateChange()) {
8489
throw new Exception("learned about role change");
@@ -99,6 +104,7 @@ void followLeader() throws InterruptedException {
99104
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
100105
syncWithLeader(newEpochZxid);
101106
self.setZabState(QuorumPeer.ZabState.BROADCAST);
107+
completedSync = true;
102108
} finally {
103109
long syncTime = Time.currentElapsedTime() - startTime;
104110
ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
@@ -129,6 +135,14 @@ void followLeader() throws InterruptedException {
129135
om.stop();
130136
}
131137
zk.unregisterJMX(this);
138+
139+
if (connectionTime != 0) {
140+
long connectionDuration = System.currentTimeMillis() - connectionTime;
141+
LOG.info("Disconnected from leader (with address: {}). "
142+
+ "Was connected for {}ms. Sync state: {}",
143+
leaderAddr, connectionDuration, completedSync);
144+
messageTracker.dumpToLog(leaderAddr.toString());
145+
}
132146
}
133147
}
134148

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.zookeeper.server.ZooTrace;
5353
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
5454
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
55+
import org.apache.zookeeper.server.util.MessageTracker;
5556
import org.apache.zookeeper.server.util.SerializeUtils;
5657
import org.apache.zookeeper.server.util.ZxidUtils;
5758
import org.apache.zookeeper.txn.SetDataTxn;
@@ -79,6 +80,7 @@ static class PacketInFlight {
7980
protected BufferedOutputStream bufferedOutput;
8081

8182
protected Socket sock;
83+
protected MultipleAddresses leaderAddr;
8284

8385
/**
8486
* Socket getter
@@ -93,6 +95,9 @@ public Socket getSocket() {
9395
/** the protocol version of the leader */
9496
protected int leaderProtocolVersion = 0x01;
9597

98+
private static final int BUFFERED_MESSAGE_SIZE = 10;
99+
protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
100+
96101
protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
97102

98103
/**
@@ -151,6 +156,7 @@ void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOExcep
151156
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
152157
synchronized (leaderOs) {
153158
if (pp != null) {
159+
messageTracker.trackSent(pp.getType());
154160
leaderOs.writeRecord(pp, "packet");
155161
}
156162
if (flush) {
@@ -169,6 +175,7 @@ void writePacket(QuorumPacket pp, boolean flush) throws IOException {
169175
void readPacket(QuorumPacket pp) throws IOException {
170176
synchronized (leaderIs) {
171177
leaderIs.readRecord(pp, "packet");
178+
messageTracker.trackReceived(pp.getType());
172179
}
173180
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
174181
if (pp.getType() == Leader.PING) {
@@ -256,6 +263,7 @@ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) thr
256263
protected void connectToLeader(MultipleAddresses addr, String hostname)
257264
throws IOException, InterruptedException {
258265

266+
this.leaderAddr = addr;
259267
Set<InetSocketAddress> addresses = addr.getAllAddresses();
260268
ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
261269
CountDownLatch latch = new CountDownLatch(addresses.size());

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.zookeeper.server.quorum.Leader.Proposal;
5050
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
5151
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
52+
import org.apache.zookeeper.server.util.MessageTracker;
5253
import org.apache.zookeeper.server.util.SerializeUtils;
5354
import org.apache.zookeeper.server.util.ZxidUtils;
5455
import org.apache.zookeeper.txn.TxnHeader;
@@ -220,6 +221,8 @@ public boolean equals(Object o) {
220221
private final BufferedInputStream bufferedInput;
221222
private BufferedOutputStream bufferedOutput;
222223

224+
protected final MessageTracker messageTracker;
225+
223226
// for test only
224227
protected void setOutputArchive(BinaryOutputArchive oa) {
225228
this.oa = oa;
@@ -280,6 +283,8 @@ protected void setBufferedOutput(BufferedOutputStream bufferedOutput) {
280283
}
281284
throw new SaslException("Authentication failure: " + e.getMessage());
282285
}
286+
287+
this.messageTracker = new MessageTracker(MessageTracker.BUFFERED_MESSAGE_SIZE);
283288
}
284289

285290
@Override
@@ -349,6 +354,7 @@ private void sendPackets() throws InterruptedException {
349354
}
350355
oa.writeRecord(p, "packet");
351356
packetsSent.incrementAndGet();
357+
messageTracker.trackSent(p.getType());
352358
} catch (IOException e) {
353359
if (!sock.isClosed()) {
354360
LOG.warn("Unexpected exception at " + this, e);
@@ -464,8 +470,11 @@ public void run() {
464470

465471
QuorumPacket qp = new QuorumPacket();
466472
ia.readRecord(qp, "packet");
473+
474+
messageTracker.trackReceived(qp.getType());
467475
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
468476
LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!");
477+
469478
return;
470479
}
471480

@@ -526,9 +535,11 @@ public void run() {
526535
ByteBuffer.wrap(ver).putInt(0x10000);
527536
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
528537
oa.writeRecord(newEpochPacket, "packet");
538+
messageTracker.trackSent(Leader.LEADERINFO);
529539
bufferedOutput.flush();
530540
QuorumPacket ackEpochPacket = new QuorumPacket();
531541
ia.readRecord(ackEpochPacket, "packet");
542+
messageTracker.trackReceived(ackEpochPacket.getType());
532543
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
533544
LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH");
534545
return;
@@ -554,6 +565,7 @@ public void run() {
554565
try {
555566
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
556567
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
568+
messageTracker.trackSent(Leader.SNAP);
557569
bufferedOutput.flush();
558570

559571
LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
@@ -600,6 +612,8 @@ public void run() {
600612
*/
601613
qp = new QuorumPacket();
602614
ia.readRecord(qp, "packet");
615+
616+
messageTracker.trackReceived(qp.getType());
603617
if (qp.getType() != Leader.ACK) {
604618
LOG.error("Next packet was supposed to be an ACK," + " but received packet: {}", packetToString(qp));
605619
return;
@@ -632,6 +646,7 @@ public void run() {
632646
while (true) {
633647
qp = new QuorumPacket();
634648
ia.readRecord(qp, "packet");
649+
messageTracker.trackReceived(qp.getType());
635650

636651
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
637652
if (qp.getType() == Leader.PING) {
@@ -716,7 +731,9 @@ public void run() {
716731
syncThrottler.endSync();
717732
syncThrottler = null;
718733
}
719-
LOG.warn("******* GOODBYE {} ********", getRemoteAddress());
734+
String remoteAddr = getRemoteAddress();
735+
LOG.warn("******* GOODBYE {} ********", remoteAddr);
736+
messageTracker.dumpToLog(remoteAddr);
720737
shutdown();
721738
}
722739
}

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,14 @@ public String toString() {
9797
*/
9898
void observeLeader() throws Exception {
9999
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
100-
100+
long connectTime = 0;
101+
boolean completedSync = false;
101102
try {
102103
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
103104
QuorumServer master = findLearnerMaster();
104105
try {
105106
connectToLeader(master.addr, master.hostname);
107+
connectTime = System.currentTimeMillis();
106108
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
107109
if (self.isReconfigStateChange()) {
108110
throw new Exception("learned about role change");
@@ -112,6 +114,7 @@ void observeLeader() throws Exception {
112114
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
113115
syncWithLeader(newLeaderZxid);
114116
self.setZabState(QuorumPeer.ZabState.BROADCAST);
117+
completedSync = true;
115118
QuorumPacket qp = new QuorumPacket();
116119
while (this.isRunning() && nextLearnerMaster.get() == null) {
117120
readPacket(qp);
@@ -127,6 +130,14 @@ void observeLeader() throws Exception {
127130
} finally {
128131
currentLearnerMaster = null;
129132
zk.unregisterJMX(this);
133+
if (connectTime != 0) {
134+
long connectionDuration = System.currentTimeMillis() - connectTime;
135+
136+
LOG.info("Disconnected from leader (with address: {}). "
137+
+ "Was connected for {}ms. Sync state: {}",
138+
leaderAddr, connectionDuration, completedSync);
139+
messageTracker.dumpToLog(leaderAddr.toString());
140+
}
130141
}
131142
}
132143

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.zookeeper.server.util;
20+
21+
import java.lang.reflect.Array;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
24+
/**
25+
* Thread safe FIFO CircularBuffer implementation.
26+
* When the buffer is full write operation overwrites the oldest element.
27+
*
28+
* Fun thing @todo, make this lock free as this is called on every quorum message
29+
*/
30+
public class CircularBuffer<T> {
31+
32+
private final T[] buffer;
33+
private final int capacity;
34+
private int oldest;
35+
private AtomicInteger numberOfElements = new AtomicInteger();
36+
37+
@SuppressWarnings("unchecked")
38+
public CircularBuffer(Class<T> clazz, int capacity) {
39+
if (capacity <= 0) {
40+
throw new IllegalArgumentException("CircularBuffer capacity should be greater than 0");
41+
}
42+
this.buffer = (T[]) Array.newInstance(clazz, capacity);
43+
this.capacity = capacity;
44+
}
45+
46+
/**
47+
* Puts elements in the next available index in the array.
48+
* If the array is full the oldest element is replaced with
49+
* the new value.
50+
* @param element
51+
*/
52+
public synchronized void write(T element) {
53+
int newSize = numberOfElements.incrementAndGet();
54+
if (newSize > capacity) {
55+
buffer[oldest] = element;
56+
oldest = ++oldest % capacity;
57+
numberOfElements.decrementAndGet();
58+
} else {
59+
int index = (oldest + numberOfElements.get() - 1) % capacity;
60+
buffer[index] = element;
61+
}
62+
}
63+
64+
/**
65+
* Reads from the buffer in a FIFO manner.
66+
* Returns the oldest element in the buffer if the buffer ie not empty
67+
* Returns null if the buffer is empty
68+
* @return
69+
*/
70+
public synchronized T take() {
71+
int newSize = numberOfElements.decrementAndGet();
72+
if (newSize < 0) {
73+
numberOfElements.incrementAndGet();
74+
return null;
75+
}
76+
T polled = buffer[oldest];
77+
oldest = ++oldest % capacity;
78+
return polled;
79+
}
80+
81+
public synchronized T peek() {
82+
if (numberOfElements.get() <= 0) {
83+
return null;
84+
}
85+
return buffer[oldest];
86+
}
87+
88+
public int size() {
89+
return numberOfElements.get();
90+
}
91+
92+
public boolean isEmpty() {
93+
return numberOfElements.get() <= 0;
94+
}
95+
96+
public boolean isFull() {
97+
return numberOfElements.get() >= capacity;
98+
}
99+
100+
public synchronized void reset() {
101+
numberOfElements.set(0);
102+
}
103+
}

0 commit comments

Comments
 (0)