@@ -46,15 +46,25 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
4646
4747 public static final String HANDLER_NAME = "frameDecoder" ;
4848 private static final int LENGTH_SIZE = 8 ;
49+ private static final int MAX_FRAME_SIZE = Integer .MAX_VALUE ;
4950 private static final int UNKNOWN_FRAME_SIZE = -1 ;
5051
5152 private final LinkedList <ByteBuf > buffers = new LinkedList <>();
5253 private final ByteBuf frameLenBuf = Unpooled .buffer (LENGTH_SIZE , LENGTH_SIZE );
54+ private final boolean isSupportLargeData ;
5355
5456 private long totalSize = 0 ;
5557 private long nextFrameSize = UNKNOWN_FRAME_SIZE ;
5658 private volatile Interceptor interceptor ;
5759
60+ public TransportFrameDecoder () {
61+ this (true );
62+ }
63+
64+ public TransportFrameDecoder (boolean isSupportLargeData ) {
65+ this .isSupportLargeData = isSupportLargeData ;
66+ }
67+
5868 @ Override
5969 public void channelRead (ChannelHandlerContext ctx , Object data ) throws Exception {
6070 ByteBuf in = (ByteBuf ) data ;
@@ -77,7 +87,13 @@ public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception
7787 totalSize -= read ;
7888 } else {
7989 // Interceptor is not active, so try to decode one frame.
80- LinkedList <ByteBuf > frame = decodeNext ();
90+ Object frame ;
91+ if (isSupportLargeData ) {
92+ frame = decodeList ();
93+ } else {
94+ frame = decodeByteBuf ();
95+ }
96+
8197 if (frame == null ) {
8298 break ;
8399 }
@@ -120,7 +136,36 @@ private long decodeFrameSize() {
120136 return nextFrameSize ;
121137 }
122138
123- private LinkedList <ByteBuf > decodeNext () throws Exception {
139+ private ByteBuf decodeByteBuf () throws Exception {
140+ long frameSize = decodeFrameSize ();
141+ if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize ) {
142+ return null ;
143+ }
144+
145+ // Reset size for next frame.
146+ nextFrameSize = UNKNOWN_FRAME_SIZE ;
147+
148+ Preconditions .checkArgument (frameSize < MAX_FRAME_SIZE , "Too large frame: %s" , frameSize );
149+ Preconditions .checkArgument (frameSize > 0 , "Frame length should be positive: %s" , frameSize );
150+
151+ // If the first buffer holds the entire frame, return it.
152+ int remaining = (int ) frameSize ;
153+ if (buffers .getFirst ().readableBytes () >= remaining ) {
154+ return nextBufferForFrame (remaining );
155+ }
156+
157+ // Otherwise, create a composite buffer.
158+ CompositeByteBuf frame = buffers .getFirst ().alloc ().compositeBuffer (Integer .MAX_VALUE );
159+ while (remaining > 0 ) {
160+ ByteBuf next = nextBufferForFrame (remaining );
161+ remaining -= next .readableBytes ();
162+ frame .addComponent (next ).writerIndex (frame .writerIndex () + next .readableBytes ());
163+ }
164+ assert remaining == 0 ;
165+ return frame ;
166+ }
167+
168+ private LinkedList <ByteBuf > decodeList () throws Exception {
124169 long frameSize = decodeFrameSize ();
125170 if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize ) {
126171 return null ;
0 commit comments