|
22 | 22 | import com.google.common.base.Preconditions; |
23 | 23 | import com.google.common.util.concurrent.Uninterruptibles; |
24 | 24 | import io.grpc.Status; |
| 25 | +import io.grpc.Status.Code; |
25 | 26 | import io.grpc.StatusRuntimeException; |
26 | 27 | import java.util.Deque; |
27 | 28 | import java.util.LinkedList; |
|
39 | 40 | * |
40 | 41 | * <p>TODO: Attach schema. |
41 | 42 | * |
42 | | - * <p>TODO: Add max size check. |
43 | | - * |
44 | 43 | * <p>TODO: Add inflight control. |
45 | 44 | * |
46 | 45 | * <p>TODO: Attach traceId. |
@@ -94,6 +93,11 @@ public class StreamWriterV2 implements AutoCloseable { |
94 | 93 | */ |
95 | 94 | private Thread appendThread; |
96 | 95 |
|
| 96 | + /** The maximum size of one request. Defined by the API. */ |
| 97 | + public static long getApiMaxRequestBytes() { |
| 98 | + return 8L * 1000L * 1000L; // 8 megabytes (https://en.wikipedia.org/wiki/Megabyte) |
| 99 | + } |
| 100 | + |
97 | 101 | private StreamWriterV2(Builder builder) { |
98 | 102 | this.lock = new ReentrantLock(); |
99 | 103 | this.hasMessageInWaitingQueue = lock.newCondition(); |
@@ -154,6 +158,17 @@ public void run() { |
154 | 158 | */ |
155 | 159 | public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) { |
156 | 160 | AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); |
| 161 | + if (requestWrapper.messageSize > getApiMaxRequestBytes()) { |
| 162 | + requestWrapper.appendResult.setException( |
| 163 | + new StatusRuntimeException( |
| 164 | + Status.fromCode(Code.INVALID_ARGUMENT) |
| 165 | + .withDescription( |
| 166 | + "MessageSize is too large. Max allow: " |
| 167 | + + getApiMaxRequestBytes() |
| 168 | + + " Actual: " |
| 169 | + + requestWrapper.messageSize))); |
| 170 | + return requestWrapper.appendResult; |
| 171 | + } |
157 | 172 | this.lock.lock(); |
158 | 173 | try { |
159 | 174 | if (userClosed) { |
@@ -355,10 +370,12 @@ public StreamWriterV2 build() { |
355 | 370 | private static final class AppendRequestAndResponse { |
356 | 371 | final SettableApiFuture<AppendRowsResponse> appendResult; |
357 | 372 | final AppendRowsRequest message; |
| 373 | + final long messageSize; |
358 | 374 |
|
359 | 375 | AppendRequestAndResponse(AppendRowsRequest message) { |
360 | 376 | this.appendResult = SettableApiFuture.create(); |
361 | 377 | this.message = message; |
| 378 | + this.messageSize = message.getProtoRows().getSerializedSize(); |
362 | 379 | } |
363 | 380 | } |
364 | 381 | } |
0 commit comments