Skip to content

Commit 642e345

Browse files
authored
feat: Support gRPC Compression (#2197)
* Support gRPC compression * Ignore error for method signature change * Clean some debugging leftovers * Remove exception from clirr-ignored-differences.xml to try and get rid of the conflict with googleapis/java-bigquerystorage#2192 * Remove excessive arg verification and leave it only in the StreamWriter
1 parent 4897c05 commit 642e345

10 files changed

Lines changed: 121 additions & 5 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.logging.Logger;
5454
import java.util.regex.Matcher;
5555
import java.util.regex.Pattern;
56+
import javax.annotation.Nullable;
5657
import javax.annotation.concurrent.GuardedBy;
5758

5859
/**
@@ -119,6 +120,10 @@ class ConnectionWorker implements AutoCloseable {
119120
*/
120121
private final String traceId;
121122

123+
/*
124+
* Enables compression on the wire.
125+
*/
126+
private String compressorName = null;
122127
/*
123128
* Tracks current inflight requests in the stream.
124129
*/
@@ -253,6 +258,7 @@ public ConnectionWorker(
253258
Duration maxRetryDuration,
254259
FlowController.LimitExceededBehavior limitExceededBehavior,
255260
String traceId,
261+
@Nullable String compressorName,
256262
BigQueryWriteSettings clientSettings)
257263
throws IOException {
258264
this.lock = new ReentrantLock();
@@ -274,6 +280,7 @@ public ConnectionWorker(
274280
this.traceId = traceId;
275281
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
276282
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
283+
this.compressorName = compressorName;
277284
// Always recreate a client for connection worker.
278285
HashMap<String, String> newHeaders = new HashMap<>();
279286
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
@@ -343,7 +350,8 @@ public void run(AppendRowsResponse response) {
343350
public void run(Throwable finalStatus) {
344351
doneCallback(finalStatus);
345352
}
346-
});
353+
},
354+
this.compressorName);
347355
log.info("Finish connecting stream: " + streamName + " id: " + writerId);
348356
}
349357

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.logging.Logger;
4242
import java.util.regex.Matcher;
4343
import java.util.regex.Pattern;
44+
import javax.annotation.Nullable;
4445
import javax.annotation.concurrent.GuardedBy;
4546

4647
/** Pool of connections to accept appends and distirbute to different connections. */
@@ -91,6 +92,10 @@ public class ConnectionWorkerPool {
9192
* TraceId for debugging purpose.
9293
*/
9394
private final String traceId;
95+
/*
96+
* Sets the compression to use for the calls
97+
*/
98+
private String compressorName;
9499

95100
/** Used for test on the number of times createWorker is called. */
96101
private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0);
@@ -199,12 +204,14 @@ public abstract static class Builder {
199204
java.time.Duration maxRetryDuration,
200205
FlowController.LimitExceededBehavior limitExceededBehavior,
201206
String traceId,
207+
@Nullable String comperssorName,
202208
BigQueryWriteSettings clientSettings) {
203209
this.maxInflightRequests = maxInflightRequests;
204210
this.maxInflightBytes = maxInflightBytes;
205211
this.maxRetryDuration = maxRetryDuration;
206212
this.limitExceededBehavior = limitExceededBehavior;
207213
this.traceId = traceId;
214+
this.compressorName = comperssorName;
208215
this.clientSettings = clientSettings;
209216
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
210217
}
@@ -379,6 +386,7 @@ private ConnectionWorker createConnectionWorker(
379386
maxRetryDuration,
380387
limitExceededBehavior,
381388
traceId,
389+
compressorName,
382390
clientSettings);
383391
connectionWorkerPool.add(connectionWorker);
384392
log.info(

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,17 @@ public Builder setLocation(String location) {
336336
return this;
337337
}
338338

339+
/**
340+
* Sets the compression to use for the calls. The compressor must be of type gzip.
341+
*
342+
* @param compressorName
343+
* @return Builder
344+
*/
345+
public Builder setCompressorName(String compressorName) {
346+
this.schemaAwareStreamWriterBuilder.setCompressorName(compressorName);
347+
return this;
348+
}
349+
339350
/**
340351
* Builds JsonStreamWriter
341352
*

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class SchemaAwareStreamWriter<T> implements AutoCloseable {
6262
private Descriptor descriptor;
6363
private TableSchema tableSchema;
6464
private ProtoSchema protoSchema;
65+
private String CompressorName;
6566

6667
// During some sitaution we want to skip stream writer refresh for updated schema. e.g. when
6768
// the user provides the table schema, we should always use that schema.
@@ -92,7 +93,8 @@ private SchemaAwareStreamWriter(Builder<T> builder)
9293
builder.endpoint,
9394
builder.flowControlSettings,
9495
builder.traceIdBase,
95-
builder.traceId);
96+
builder.traceId,
97+
builder.compressorName);
9698
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
9799
streamWriterBuilder.setLocation(builder.location);
98100
this.streamWriter = streamWriterBuilder.build();
@@ -276,7 +278,8 @@ private void setStreamWriterSettings(
276278
@Nullable String endpoint,
277279
@Nullable FlowControlSettings flowControlSettings,
278280
@Nullable String traceIdBase,
279-
@Nullable String traceId) {
281+
@Nullable String traceId,
282+
@Nullable String compressorName) {
280283
if (channelProvider != null) {
281284
streamWriterBuilder.setChannelProvider(channelProvider);
282285
}
@@ -316,6 +319,9 @@ private void setStreamWriterSettings(
316319
flowControlSettings.getLimitExceededBehavior());
317320
}
318321
}
322+
if (compressorName != null) {
323+
streamWriterBuilder.setCompressorName(compressorName);
324+
}
319325
}
320326

321327
/**
@@ -425,6 +431,7 @@ public static final class Builder<T> {
425431
// Indicates whether multiplexing mode is enabled.
426432
private boolean enableConnectionPool = false;
427433
private String location;
434+
private String compressorName;
428435

429436
private static final String streamPatternString =
430437
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
@@ -609,6 +616,17 @@ public Builder<T> setLocation(String location) {
609616
return this;
610617
}
611618

619+
/**
620+
* Sets the compression to use for the calls. The compressor must be of type gzip.
621+
*
622+
* @param compressorName
623+
* @return Builder
624+
*/
625+
public Builder<T> setCompressorName(String compressorName) {
626+
this.compressorName = compressorName;
627+
return this;
628+
}
629+
612630
/**
613631
* Builds SchemaAwareStreamWriter
614632
*

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamConnection.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,18 @@
1515
*/
1616
package com.google.cloud.bigquery.storage.v1;
1717

18+
import com.google.api.gax.grpc.GrpcCallContext;
19+
import com.google.api.gax.rpc.ApiCallContext;
1820
import com.google.api.gax.rpc.BidiStreamingCallable;
1921
import com.google.api.gax.rpc.ClientStream;
2022
import com.google.api.gax.rpc.ResponseObserver;
2123
import com.google.api.gax.rpc.StreamController;
24+
import io.grpc.CallOptions;
2225
import io.grpc.Status;
2326
import io.grpc.Status.Code;
2427
import io.grpc.StatusRuntimeException;
28+
import java.util.logging.Logger;
29+
import javax.annotation.Nullable;
2530

2631
/**
2732
* StreamConnection is responsible for writing requests to a GRPC bidirecional connection.
@@ -43,11 +48,24 @@ class StreamConnection {
4348
private RequestCallback requestCallback;
4449
private DoneCallback doneCallback;
4550

51+
private static final Logger log = Logger.getLogger(StreamConnection.class.getName());
52+
4653
public StreamConnection(
47-
BigQueryWriteClient client, RequestCallback requestCallback, DoneCallback doneCallback) {
54+
BigQueryWriteClient client,
55+
RequestCallback requestCallback,
56+
DoneCallback doneCallback,
57+
@Nullable String compressorName) {
4858
this.requestCallback = requestCallback;
4959
this.doneCallback = doneCallback;
5060

61+
ApiCallContext apiCallContext = null;
62+
if (compressorName != null) {
63+
apiCallContext =
64+
GrpcCallContext.createDefault()
65+
.withCallOptions(CallOptions.DEFAULT.withCompression(compressorName));
66+
log.info("gRPC compression is enabled with " + compressorName + " compression");
67+
}
68+
5169
bidiStreamingCallable = client.appendRowsCallable();
5270
clientStream =
5371
bidiStreamingCallable.splitCall(
@@ -75,7 +93,8 @@ public void onComplete() {
7593
Status.fromCode(Code.CANCELLED)
7694
.withDescription("Stream is closed by user.")));
7795
}
78-
});
96+
},
97+
apiCallContext);
7998
}
8099

81100
/**

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ private StreamWriter(Builder builder) throws IOException {
215215
builder.maxRetryDuration,
216216
builder.limitExceededBehavior,
217217
builder.traceId,
218+
builder.compressorName,
218219
clientSettings));
219220
} else {
220221
if (!isDefaultStream(streamName)) {
@@ -276,6 +277,7 @@ private StreamWriter(Builder builder) throws IOException {
276277
builder.maxRetryDuration,
277278
builder.limitExceededBehavior,
278279
builder.traceId,
280+
builder.compressorName,
279281
client.getSettings());
280282
}));
281283
validateFetchedConnectonPool(builder);
@@ -598,6 +600,8 @@ public static final class Builder {
598600

599601
private java.time.Duration maxRetryDuration = Duration.ofMinutes(5);
600602

603+
private String compressorName = null;
604+
601605
private Builder(String streamName) {
602606
this.streamName = Preconditions.checkNotNull(streamName);
603607
this.client = null;
@@ -716,6 +720,16 @@ public Builder setMaxRetryDuration(java.time.Duration maxRetryDuration) {
716720
return this;
717721
}
718722

723+
public Builder setCompressorName(String compressorName) {
724+
Preconditions.checkNotNull(compressorName);
725+
Preconditions.checkArgument(
726+
compressorName.equals("gzip"),
727+
"Compression of type \"%s\" isn't supported, only \"gzip\" compression is supported.",
728+
compressorName);
729+
this.compressorName = compressorName;
730+
return this;
731+
}
732+
719733
/** Builds the {@code StreamWriterV2}. */
720734
public StreamWriter build() throws IOException {
721735
return new StreamWriter(this);

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,7 @@ ConnectionWorkerPool createConnectionWorkerPool(
477477
maxRetryDuration,
478478
FlowController.LimitExceededBehavior.Block,
479479
TEST_TRACE_ID,
480+
null,
480481
clientSettings);
481482
}
482483
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ public void testAppendButInflightQueueFull() throws Exception {
333333
Duration.ofSeconds(100),
334334
FlowController.LimitExceededBehavior.Block,
335335
TEST_TRACE_ID,
336+
null,
336337
client.getSettings());
337338
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
338339
ConnectionWorker.setMaxInflightQueueWaitTime(500);
@@ -388,6 +389,7 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
388389
Duration.ofSeconds(100),
389390
FlowController.LimitExceededBehavior.Block,
390391
TEST_TRACE_ID,
392+
null,
391393
client.getSettings());
392394
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
393395
ConnectionWorker.setMaxInflightQueueWaitTime(500);
@@ -451,6 +453,7 @@ public void testLocationMismatch() throws Exception {
451453
Duration.ofSeconds(100),
452454
FlowController.LimitExceededBehavior.Block,
453455
TEST_TRACE_ID,
456+
null,
454457
client.getSettings());
455458
StatusRuntimeException ex =
456459
assertThrows(
@@ -481,6 +484,7 @@ public void testStreamNameMismatch() throws Exception {
481484
Duration.ofSeconds(100),
482485
FlowController.LimitExceededBehavior.Block,
483486
TEST_TRACE_ID,
487+
null,
484488
client.getSettings());
485489
StatusRuntimeException ex =
486490
assertThrows(
@@ -532,6 +536,7 @@ private ConnectionWorker createConnectionWorker(
532536
maxRetryDuration,
533537
FlowController.LimitExceededBehavior.Block,
534538
TEST_TRACE_ID,
539+
null,
535540
client.getSettings());
536541
}
537542

@@ -625,6 +630,7 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
625630
Duration.ofSeconds(100),
626631
FlowController.LimitExceededBehavior.Block,
627632
TEST_TRACE_ID,
633+
null,
628634
client.getSettings());
629635
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));
630636

@@ -681,6 +687,7 @@ public void testLongTimeIdleWontFail() throws Exception {
681687
Duration.ofSeconds(100),
682688
FlowController.LimitExceededBehavior.Block,
683689
TEST_TRACE_ID,
690+
null,
684691
client.getSettings());
685692

686693
long appendCount = 10;

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1452,4 +1452,20 @@ public void testAppendWithMissingValueMap() throws Exception {
14521452
missingValueMap);
14531453
}
14541454
}
1455+
1456+
@Test
1457+
public void testWrongCompressionType() throws Exception {
1458+
IllegalArgumentException ex =
1459+
assertThrows(
1460+
IllegalArgumentException.class,
1461+
() -> {
1462+
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA)
1463+
.setCompressorName("not-gzip")
1464+
.build();
1465+
});
1466+
assertTrue(
1467+
ex.getMessage()
1468+
.contains(
1469+
"Compression of type \"not-gzip\" isn't supported, only \"gzip\" compression is supported."));
1470+
}
14551471
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,20 @@ public void testMessageTooLarge() throws Exception {
949949
writer.close();
950950
}
951951

952+
@Test
953+
public void testWrongCompressionType() throws Exception {
954+
IllegalArgumentException ex =
955+
assertThrows(
956+
IllegalArgumentException.class,
957+
() -> {
958+
StreamWriter.newBuilder(TEST_STREAM_1, client).setCompressorName("not-gzip").build();
959+
});
960+
assertTrue(
961+
ex.getMessage()
962+
.contains(
963+
"Compression of type \"not-gzip\" isn't supported, only \"gzip\" compression is supported."));
964+
}
965+
952966
@Test
953967
public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
954968
ProtoSchema schema1 = createProtoSchema("foo");

0 commit comments

Comments
 (0)