Skip to content

Commit a7fbb6f

Browse files
authored
JdkZlibDecoder: accumulate decompressed output before firing channelRead (#16510)
Motivation: A recent change (9d804c5) changed JdkZlibDecoder to fire ctx.fireChannelRead() on every inflate iteration (~8KB output) when maxAllocation is 0. For a typical ~150KB HTTP response this produces ~19 small buffer allocations and ~19 pipeline dispatches through the internal EmbeddedChannel used by HttpContentDecoder, causing a 30-35% throughput regression even in aggregated mode (where chunk count is irrelevant downstream). Modifications: Accumulate decompressed output up to 64KB (DEFAULT_MAX_FORWARD_BYTES) before firing ctx.fireChannelRead(). The buffer grows naturally via prepareDecompressBuffer() until the threshold, then fires and starts a new buffer. Any remaining data fires in the finally block as before. Memory per in-flight buffer is bounded to 64KB regardless of the compressed input size. Result: Throughput is restored to pre-regression levels. Chunks per response drop from ~163 to ~6 for a 150KB payload.
1 parent 7937553 commit a7fbb6f

5 files changed

Lines changed: 51 additions & 10 deletions

File tree

codec-compression/src/main/java/io/netty/handler/codec/compression/BrotliDecoder.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
*/
3333
public final class BrotliDecoder extends ByteToMessageDecoder {
3434

35+
private static final int DEFAULT_MAX_FORWARD_BYTES = CompressionUtil.DEFAULT_MAX_FORWARD_BYTES;
36+
3537
private enum State {
3638
DONE, NEEDS_MORE_INPUT, ERROR
3739
}
@@ -48,6 +50,7 @@ private enum State {
4850
private DecoderJNI.Wrapper decoder;
4951
private boolean destroyed;
5052
private boolean needsRead;
53+
private ByteBuf accumBuffer;
5154

5255
/**
5356
* Creates a new BrotliDecoder with a default 8kB input buffer
@@ -67,10 +70,25 @@ public BrotliDecoder(int inputBufferSize) {
6770
private void forwardOutput(ChannelHandlerContext ctx) {
6871
ByteBuffer nativeBuffer = decoder.pull();
6972
// nativeBuffer actually wraps brotli's internal buffer so we need to copy its content
70-
ByteBuf copy = ctx.alloc().buffer(nativeBuffer.remaining());
71-
copy.writeBytes(nativeBuffer);
73+
int remaining = nativeBuffer.remaining();
74+
if (accumBuffer == null) {
75+
accumBuffer = ctx.alloc().buffer(remaining);
76+
}
77+
accumBuffer.writeBytes(nativeBuffer);
7278
needsRead = false;
73-
ctx.fireChannelRead(copy);
79+
if (accumBuffer.readableBytes() >= DEFAULT_MAX_FORWARD_BYTES) {
80+
ctx.fireChannelRead(accumBuffer);
81+
accumBuffer = null;
82+
}
83+
}
84+
85+
private void flushAccumBuffer(ChannelHandlerContext ctx) {
86+
if (accumBuffer != null && accumBuffer.isReadable()) {
87+
ctx.fireChannelRead(accumBuffer);
88+
} else if (accumBuffer != null) {
89+
accumBuffer.release();
90+
}
91+
accumBuffer = null;
7492
}
7593

7694
private State decompress(ChannelHandlerContext ctx, ByteBuf input) {
@@ -145,6 +163,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
145163
} catch (Exception e) {
146164
destroy();
147165
throw e;
166+
} finally {
167+
flushAccumBuffer(ctx);
148168
}
149169
}
150170

codec-compression/src/main/java/io/netty/handler/codec/compression/CompressionUtil.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
package io.netty.handler.codec.compression;
1717

1818
import io.netty.buffer.ByteBuf;
19+
import io.netty.util.internal.SystemPropertyUtil;
1920

2021
import java.nio.ByteBuffer;
2122

2223
final class CompressionUtil {
2324

25+
static final int DEFAULT_MAX_FORWARD_BYTES = SystemPropertyUtil.getInt(
26+
"io.netty.compression.defaultMaxForwardBytes", 64 * 1024);
27+
2428
private CompressionUtil() { }
2529

2630
static void checkChecksum(ByteBufChecksum checksum, ByteBuf uncompressed, int currentChecksum) {

codec-compression/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public class JZlibDecoder extends ZlibDecoder {
2828

2929
private final Inflater z = new Inflater();
3030
private byte[] dictionary;
31+
private static final int DEFAULT_MAX_FORWARD_BYTES = CompressionUtil.DEFAULT_MAX_FORWARD_BYTES;
32+
private final int maxForwardBytes;
3133
private boolean needsRead;
3234
private volatile boolean finished;
3335

@@ -78,6 +80,7 @@ public JZlibDecoder(ZlibWrapper wrapper) {
7880
*/
7981
public JZlibDecoder(ZlibWrapper wrapper, int maxAllocation) {
8082
super(maxAllocation);
83+
this.maxForwardBytes = maxAllocation > 0 ? maxAllocation : DEFAULT_MAX_FORWARD_BYTES;
8184

8285
ObjectUtil.checkNotNull(wrapper, "wrapper");
8386

@@ -113,6 +116,7 @@ public JZlibDecoder(byte[] dictionary) {
113116
*/
114117
public JZlibDecoder(byte[] dictionary, int maxAllocation) {
115118
super(maxAllocation);
119+
this.maxForwardBytes = maxAllocation > 0 ? maxAllocation : DEFAULT_MAX_FORWARD_BYTES;
116120
this.dictionary = ObjectUtil.checkNotNull(dictionary, "dictionary");
117121
int resultCode;
118122
resultCode = z.inflateInit(JZlib.W_ZLIB);
@@ -174,7 +178,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
174178
int outputLength = z.next_out_index - oldNextOutIndex;
175179
if (outputLength > 0) {
176180
decompressed.writerIndex(decompressed.writerIndex() + outputLength);
177-
if (maxAllocation == 0) {
181+
if (maxAllocation == 0 && decompressed.readableBytes() >= maxForwardBytes) {
178182
// If we don't limit the maximum allocations we should just
179183
// forward the buffer directly.
180184
ByteBuf buffer = decompressed;

codec-compression/src/main/java/io/netty/handler/codec/compression/JdkZlibDecoder.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ private enum GzipState {
5959
private int xlen = -1;
6060
private boolean needsRead;
6161

62+
private static final int DEFAULT_MAX_FORWARD_BYTES = CompressionUtil.DEFAULT_MAX_FORWARD_BYTES;
63+
private final int maxForwardBytes;
64+
6265
private volatile boolean finished;
6366

6467
private boolean decideZlibOrNone;
@@ -161,6 +164,7 @@ public JdkZlibDecoder(boolean decompressConcatenated, int maxAllocation) {
161164

162165
private JdkZlibDecoder(ZlibWrapper wrapper, byte[] dictionary, boolean decompressConcatenated, int maxAllocation) {
163166
super(maxAllocation);
167+
this.maxForwardBytes = maxAllocation > 0 ? maxAllocation : DEFAULT_MAX_FORWARD_BYTES;
164168

165169
ObjectUtil.checkNotNull(wrapper, "wrapper");
166170

@@ -265,9 +269,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
265269
if (crc != null) {
266270
crc.update(outArray, outIndex, outputLength);
267271
}
268-
if (maxAllocation == 0) {
269-
// If we don't limit the maximum allocations we should just
270-
// forward the buffer directly.
272+
if (maxAllocation == 0 && decompressed.readableBytes() >= maxForwardBytes) {
273+
// Forward the buffer once it exceeds the threshold to bound memory
274+
// while avoiding excessive fireChannelRead calls.
271275
ByteBuf buffer = decompressed;
272276
decompressed = null;
273277
needsRead = false;

codec-compression/src/main/java/io/netty/handler/codec/compression/ZstdDecoder.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ public final class ZstdDecoder extends ByteToMessageDecoder {
4141
}
4242
}
4343

44+
private static final int DEFAULT_MAX_FORWARD_BYTES = CompressionUtil.DEFAULT_MAX_FORWARD_BYTES;
45+
4446
private final int maximumAllocationSize;
47+
private final int maxForwardBytes;
4548
private final MutableByteBufInputStream inputStream = new MutableByteBufInputStream();
4649
private ZstdInputStreamNoFinalizer zstdIs;
4750

@@ -62,6 +65,7 @@ public ZstdDecoder() {
6265

6366
public ZstdDecoder(int maximumAllocationSize) {
6467
this.maximumAllocationSize = ObjectUtil.checkPositiveOrZero(maximumAllocationSize, "maximumAllocationSize");
68+
this.maxForwardBytes = maximumAllocationSize > 0 ? maximumAllocationSize : DEFAULT_MAX_FORWARD_BYTES;
6569
}
6670

6771
@Override
@@ -101,13 +105,18 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
101105
}
102106
do {
103107
w = outBuffer.writeBytes(zstdIs, outBuffer.writableBytes());
104-
} while (w != -1 && outBuffer.isWritable());
105-
if (outBuffer.isReadable()) {
108+
} while (w > 0 && outBuffer.isWritable());
109+
if (!outBuffer.isWritable() || outBuffer.readableBytes() >= maxForwardBytes) {
106110
needsRead = false;
107111
ctx.fireChannelRead(outBuffer);
108112
outBuffer = null;
109113
}
110-
} while (w != -1);
114+
} while (w > 0);
115+
if (outBuffer != null && outBuffer.isReadable()) {
116+
needsRead = false;
117+
ctx.fireChannelRead(outBuffer);
118+
outBuffer = null;
119+
}
111120
} finally {
112121
if (outBuffer != null) {
113122
outBuffer.release();

0 commit comments

Comments
 (0)