Skip to content

Commit 45b6c0f

Browse files
symatMate Szalay-Beko
authored andcommitted
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
2 parents 4b6bcea + 945167c commit 45b6c0f

11 files changed

Lines changed: 230 additions & 65 deletions

File tree

zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,16 @@ property, when available, is noted below.
728728
by default with a value of 400, set to 0 or a negative
729729
integer to turn the feature off.
730730

731+
* *maxGetChildrenResponseCacheSize* :
732+
(Java system property: **zookeeper.maxGetChildrenResponseCacheSize**)
733+
**New in 3.6.0:**
734+
Similar to **maxResponseCacheSize**, but applies to get children
735+
requests. The metrics **response_packet_get_children_cache_hits**
736+
and **response_packet_get_children_cache_misses** can be used to tune
737+
this value to a given workload. The feature is turned on
738+
by default with a value of 400, set to 0 or a negative
739+
integer to turn the feature off.
740+
731741
* *autopurge.snapRetainCount* :
732742
(No Java system property)
733743
**New in 3.4.0:**

zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public void close(DisconnectReason reason) {
6161
}
6262

6363
@Override
64-
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException {
64+
public void sendResponse(ReplyHeader h, Record r, String tag,
65+
String cacheKey, Stat stat, int opCode) throws IOException {
6566
}
6667

6768
@Override

zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -585,19 +585,32 @@ public void processRequest(Request request) {
585585
updateStats(request, lastOp, lastZxid);
586586

587587
try {
588-
if (request.type == OpCode.getData && path != null && rsp != null) {
589-
// Serialized read responses could be cached by the connection object.
590-
// Cache entries are identified by their path and last modified zxid,
591-
// so these values are passed along with the response.
592-
GetDataResponse getDataResponse = (GetDataResponse) rsp;
588+
if (path == null || rsp == null) {
589+
cnxn.sendResponse(hdr, rsp, "response");
590+
} else {
591+
int opCode = request.type;
593592
Stat stat = null;
594-
if (getDataResponse.getStat() != null) {
595-
stat = getDataResponse.getStat();
593+
// Serialized read and get children responses could be cached by the connection
594+
// object. Cache entries are identified by their path and last modified zxid,
595+
// so these values are passed along with the response.
596+
switch (opCode) {
597+
case OpCode.getData : {
598+
GetDataResponse getDataResponse = (GetDataResponse) rsp;
599+
stat = getDataResponse.getStat();
600+
cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
601+
break;
602+
}
603+
case OpCode.getChildren2 : {
604+
GetChildren2Response getChildren2Response = (GetChildren2Response) rsp;
605+
stat = getChildren2Response.getStat();
606+
cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
607+
break;
608+
}
609+
default:
610+
cnxn.sendResponse(hdr, rsp, "response");
596611
}
597-
cnxn.sendResponse(hdr, rsp, "response", path, stat);
598-
} else {
599-
cnxn.sendResponse(hdr, rsp, "response");
600612
}
613+
601614
if (request.type == OpCode.closeSession) {
602615
cnxn.sendCloseSession();
603616
}

zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.jute.BinaryInputArchive;
3636
import org.apache.jute.Record;
3737
import org.apache.zookeeper.WatchedEvent;
38+
import org.apache.zookeeper.ZooDefs;
3839
import org.apache.zookeeper.data.Id;
3940
import org.apache.zookeeper.data.Stat;
4041
import org.apache.zookeeper.proto.ReplyHeader;
@@ -663,31 +664,10 @@ public static void closeSock(SocketChannel sock) {
663664

664665
private static final ByteBuffer packetSentinel = ByteBuffer.allocate(0);
665666

666-
/**
667-
* Serializes a ZooKeeper response and enqueues it for sending.
668-
*
669-
* Serializes client response parts and enqueues them into outgoing queue.
670-
*
671-
* If both cache key and last modified zxid are provided, the serialized
672-
* response is caсhed under the provided key, the last modified zxid is
673-
* stored along with the value. A cache entry is invalidated if the
674-
* provided last modified zxid is more recent than the stored one.
675-
*
676-
* Attention: this function is not thread safe, due to caching not being
677-
* thread safe.
678-
*
679-
* @param h reply header
680-
* @param r reply payload, can be null
681-
* @param tag Jute serialization tag, can be null
682-
* @param cacheKey key for caching the serialized payload. a null value
683-
* prvents caching
684-
* @param stat stat information for the the reply payload, used
685-
* for cache invalidation. a value of 0 prevents caching.
686-
*/
687667
@Override
688-
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) {
668+
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat, int opCode) {
689669
try {
690-
sendBuffer(serialize(h, r, tag, cacheKey, stat));
670+
sendBuffer(serialize(h, r, tag, cacheKey, stat, opCode));
691671
decrOutstandingAndCheckThrottle(h);
692672
} catch (Exception e) {
693673
LOG.warn("Unexpected exception. Destruction averted.", e);
@@ -712,7 +692,10 @@ public void process(WatchedEvent event) {
712692
// Convert WatchedEvent to a type that can be sent over the wire
713693
WatcherEvent e = event.getWrapper();
714694

715-
sendResponse(h, e, "notification", null, null);
695+
// The last parameter OpCode here is used to select the response cache.
696+
// Passing OpCode.error (with a value of -1) means we don't care, as we don't need
697+
// response cache on delivering watcher events.
698+
sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
716699
}
717700

718701
/*

zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,14 @@ public void process(WatchedEvent event) {
167167
}
168168

169169
@Override
170-
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException {
170+
public void sendResponse(ReplyHeader h, Record r, String tag,
171+
String cacheKey, Stat stat, int opCode) throws IOException {
171172
// cacheKey and stat are used in caching, which is not
172173
// implemented here. Implementation example can be found in NIOServerCnxn.
173174
if (closingChannel || !channel.isOpen()) {
174175
return;
175176
}
176-
sendBuffer(serialize(h, r, tag, cacheKey, stat));
177+
sendBuffer(serialize(h, r, tag, cacheKey, stat, opCode));
177178
decrOutstandingAndCheckThrottle(h);
178179
}
179180

zookeeper-server/src/main/java/org/apache/zookeeper/server/ResponseCache.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,31 @@
2222
import java.util.LinkedHashMap;
2323
import java.util.Map;
2424
import org.apache.zookeeper.data.Stat;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2527

2628
@SuppressWarnings("serial")
2729
public class ResponseCache {
30+
private static final Logger LOG = LoggerFactory.getLogger(ResponseCache.class);
2831

2932
// Magic number chosen to be "big enough but not too big"
30-
private static final int DEFAULT_RESPONSE_CACHE_SIZE = 400;
31-
33+
public static final int DEFAULT_RESPONSE_CACHE_SIZE = 400;
34+
private final int cacheSize;
3235
private static class Entry {
33-
3436
public Stat stat;
3537
public byte[] data;
36-
3738
}
3839

39-
private Map<String, Entry> cache = Collections.synchronizedMap(new LRUCache<String, Entry>(getResponseCacheSize()));
40+
private final Map<String, Entry> cache;
4041

41-
public ResponseCache() {
42+
public ResponseCache(int cacheSize) {
43+
this.cacheSize = cacheSize;
44+
cache = Collections.synchronizedMap(new LRUCache<>(cacheSize));
45+
LOG.info("Response cache size is initialized with value {}.", cacheSize);
46+
}
47+
48+
public int getCacheSize() {
49+
return cacheSize;
4250
}
4351

4452
public void put(String path, byte[] data, Stat stat) {
@@ -62,12 +70,8 @@ public byte[] get(String key, Stat stat) {
6270
}
6371
}
6472

65-
private static int getResponseCacheSize() {
66-
return Integer.getInteger("zookeeper.maxResponseCacheSize", DEFAULT_RESPONSE_CACHE_SIZE);
67-
}
68-
69-
public static boolean isEnabled() {
70-
return getResponseCacheSize() > 0;
73+
public boolean isEnabled() {
74+
return cacheSize > 0;
7175
}
7276

7377
private static class LRUCache<K, V> extends LinkedHashMap<K, V> {

zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@
4040
import org.apache.zookeeper.Quotas;
4141
import org.apache.zookeeper.WatchedEvent;
4242
import org.apache.zookeeper.Watcher;
43+
import org.apache.zookeeper.ZooDefs.OpCode;
4344
import org.apache.zookeeper.data.Id;
4445
import org.apache.zookeeper.data.Stat;
46+
import org.apache.zookeeper.metrics.Counter;
4547
import org.apache.zookeeper.proto.ReplyHeader;
4648
import org.apache.zookeeper.proto.RequestHeader;
4749
import org.slf4j.Logger;
@@ -161,10 +163,34 @@ public void decrOutstandingAndCheckThrottle(ReplyHeader h) {
161163

162164
public abstract void close(DisconnectReason reason);
163165

164-
public abstract void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException;
166+
/**
167+
* Serializes a ZooKeeper response and enqueues it for sending.
168+
*
169+
* Serializes client response parts and enqueues them into outgoing queue.
170+
*
171+
* If both cache key and last modified zxid are provided, the serialized
172+
* response is caсhed under the provided key, the last modified zxid is
173+
* stored along with the value. A cache entry is invalidated if the
174+
* provided last modified zxid is more recent than the stored one.
175+
*
176+
* Attention: this function is not thread safe, due to caching not being
177+
* thread safe.
178+
*
179+
* @param h reply header
180+
* @param r reply payload, can be null
181+
* @param tag Jute serialization tag, can be null
182+
* @param cacheKey Key for caching the serialized payload. A null value prevents caching.
183+
* @param stat Stat information for the the reply payload, used for cache invalidation.
184+
* A value of 0 prevents caching.
185+
* @param opCode The op code appertains to the corresponding request of the response,
186+
* used to decide which cache (e.g. read response cache,
187+
* list of children response cache, ...) object to look up to when applicable.
188+
*/
189+
public abstract void sendResponse(ReplyHeader h, Record r, String tag,
190+
String cacheKey, Stat stat, int opCode) throws IOException;
165191

166192
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
167-
sendResponse(h, r, tag, null, null);
193+
sendResponse(h, r, tag, null, null, -1);
168194
}
169195

170196
protected byte[] serializeRecord(Record record) throws IOException {
@@ -174,11 +200,30 @@ protected byte[] serializeRecord(Record record) throws IOException {
174200
return baos.toByteArray();
175201
}
176202

177-
protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException {
203+
protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag,
204+
String cacheKey, Stat stat, int opCode) throws IOException {
178205
byte[] header = serializeRecord(h);
179206
byte[] data = null;
180207
if (r != null) {
181-
ResponseCache cache = zkServer.getReadResponseCache();
208+
ResponseCache cache = null;
209+
Counter cacheHit = null, cacheMiss = null;
210+
switch (opCode) {
211+
case OpCode.getData : {
212+
cache = zkServer.getReadResponseCache();
213+
cacheHit = ServerMetrics.getMetrics().RESPONSE_PACKET_CACHE_HITS;
214+
cacheMiss = ServerMetrics.getMetrics().RESPONSE_PACKET_CACHE_MISSING;
215+
break;
216+
}
217+
case OpCode.getChildren2 : {
218+
cache = zkServer.getGetChildrenResponseCache();
219+
cacheHit = ServerMetrics.getMetrics().RESPONSE_PACKET_GET_CHILDREN_CACHE_HITS;
220+
cacheMiss = ServerMetrics.getMetrics().RESPONSE_PACKET_GET_CHILDREN_CACHE_MISSING;
221+
break;
222+
}
223+
default:
224+
// op codes where response cache is not supported.
225+
}
226+
182227
if (cache != null && stat != null && cacheKey != null && !cacheKey.endsWith(Quotas.statNode)) {
183228
// Use cache to get serialized data.
184229
//
@@ -189,9 +234,9 @@ protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag, String cac
189234
// Cache miss, serialize the response and put it in cache.
190235
data = serializeRecord(r);
191236
cache.put(cacheKey, data, stat);
192-
ServerMetrics.getMetrics().RESPONSE_PACKET_CACHE_MISSING.add(1);
237+
cacheMiss.add(1);
193238
} else {
194-
ServerMetrics.getMetrics().RESPONSE_PACKET_CACHE_HITS.add(1);
239+
cacheHit.add(1);
195240
}
196241
} else {
197242
data = serializeRecord(r);

zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ private ServerMetrics(MetricsProvider metricsProvider) {
9595
NODE_CHANGED_WATCHER = metricsContext.getSummary("node_changed_watch_count", DetailLevel.BASIC);
9696
NODE_CHILDREN_WATCHER = metricsContext.getSummary("node_children_watch_count", DetailLevel.BASIC);
9797

98-
9998
/*
10099
* Number of dead watchers in DeadWatcherListener
101100
*/
@@ -106,6 +105,8 @@ private ServerMetrics(MetricsProvider metricsProvider) {
106105

107106
RESPONSE_PACKET_CACHE_HITS = metricsContext.getCounter("response_packet_cache_hits");
108107
RESPONSE_PACKET_CACHE_MISSING = metricsContext.getCounter("response_packet_cache_misses");
108+
RESPONSE_PACKET_GET_CHILDREN_CACHE_HITS = metricsContext.getCounter("response_packet_get_children_cache_hits");
109+
RESPONSE_PACKET_GET_CHILDREN_CACHE_MISSING = metricsContext.getCounter("response_packet_get_children_cache_misses");
109110

110111
ENSEMBLE_AUTH_SUCCESS = metricsContext.getCounter("ensemble_auth_success");
111112

@@ -338,8 +339,14 @@ private ServerMetrics(MetricsProvider metricsProvider) {
338339
public final Counter DEAD_WATCHERS_QUEUED;
339340
public final Counter DEAD_WATCHERS_CLEARED;
340341
public final Summary DEAD_WATCHERS_CLEANER_LATENCY;
342+
343+
/*
344+
* Response cache hit and miss metrics.
345+
*/
341346
public final Counter RESPONSE_PACKET_CACHE_HITS;
342347
public final Counter RESPONSE_PACKET_CACHE_MISSING;
348+
public final Counter RESPONSE_PACKET_GET_CHILDREN_CACHE_HITS;
349+
public final Counter RESPONSE_PACKET_GET_CHILDREN_CACHE_MISSING;
343350

344351
/**
345352
* Learner handler quorum packet metrics.

zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ public static void setCloseSessionTxnEnabled(boolean enabled) {
163163
private FileTxnSnapLog txnLogFactory = null;
164164
private ZKDatabase zkDb;
165165
private ResponseCache readResponseCache;
166+
private ResponseCache getChildrenResponseCache;
166167
private final AtomicLong hzxid = new AtomicLong(0);
167168
public static final Exception ok = new Exception("No prob");
168169
protected RequestProcessor firstProcessor;
@@ -217,6 +218,9 @@ protected enum State {
217218
public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024;
218219
public static final int intBufferStartingSizeBytes;
219220

221+
public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize";
222+
public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize";
223+
220224
static {
221225
long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0);
222226
setFlushDelay(configuredFlushDelay);
@@ -306,7 +310,13 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessio
306310

307311
listener = new ZooKeeperServerListenerImpl(this);
308312

309-
readResponseCache = new ResponseCache();
313+
readResponseCache = new ResponseCache(Integer.getInteger(
314+
GET_DATA_RESPONSE_CACHE_SIZE,
315+
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));
316+
317+
getChildrenResponseCache = new ResponseCache(Integer.getInteger(
318+
GET_CHILDREN_RESPONSE_CACHE_SIZE,
319+
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));
310320

311321
this.initialConfig = initialConfig;
312322

@@ -1764,6 +1774,10 @@ public ResponseCache getReadResponseCache() {
17641774
return isResponseCachingEnabled ? readResponseCache : null;
17651775
}
17661776

1777+
public ResponseCache getGetChildrenResponseCache() {
1778+
return isResponseCachingEnabled ? getChildrenResponseCache : null;
1779+
}
1780+
17671781
protected void registerMetrics() {
17681782
MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
17691783

zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public void close(DisconnectReason reason) {
4646
}
4747

4848
@Override
49-
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException {
49+
public void sendResponse(ReplyHeader h, Record r, String tag,
50+
String cacheKey, Stat stat, int opCode) throws IOException {
5051
}
5152

5253
@Override

0 commit comments

Comments
 (0)