Skip to content

Commit 2d648cf

Browse files
fix: add a timeout on retry for retryable errors (#1930)
* fix: add a timeout on retry for retryable errors * . * fix clirr * . * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Update README.md * Update README.md * . * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * . * . Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent cf08ff8 commit 2d648cf

10 files changed

Lines changed: 134 additions & 10 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
5656
If you are using Gradle without BOM, add this to your dependencies:
5757

5858
```Groovy
59-
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.0'
59+
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.1'
6060
```
6161

6262
If you are using SBT, add this to your dependencies:
6363

6464
```Scala
65-
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.0"
65+
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.1"
6666
```
6767

6868
## Authentication

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,14 @@
7676
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
7777
<method>com.google.cloud.bigquery.storage.v1.TableSchema getUpdatedSchema()</method>
7878
</difference>
79+
<difference>
80+
<differenceType>7004</differenceType>
81+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
82+
<method>ConnectionWorker(java.lang.String, com.google.cloud.bigquery.storage.v1.ProtoSchema, long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
83+
</difference>
84+
<difference>
85+
<differenceType>7004</differenceType>
86+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
87+
<method>ConnectionWorkerPool(long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
88+
</difference>
7989
</differences>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.grpc.Status.Code;
3232
import io.grpc.StatusRuntimeException;
3333
import java.io.IOException;
34+
import java.time.Duration;
3435
import java.util.Comparator;
3536
import java.util.Deque;
3637
import java.util.HashMap;
@@ -61,6 +62,7 @@ public class ConnectionWorker implements AutoCloseable {
6162
private Lock lock;
6263
private Condition hasMessageInWaitingQueue;
6364
private Condition inflightReduced;
65+
private static Duration maxRetryDuration = Duration.ofMinutes(5);
6466

6567
/*
6668
* The identifier of the current stream to write to. This stream name can change during
@@ -114,6 +116,9 @@ public class ConnectionWorker implements AutoCloseable {
114116
@GuardedBy("lock")
115117
private long conectionRetryCountWithoutCallback = 0;
116118

119+
@GuardedBy("lock")
120+
private long connectionRetryStartTime = 0;
121+
117122
/*
118123
* If false, streamConnection needs to be reset.
119124
*/
@@ -201,6 +206,7 @@ public ConnectionWorker(
201206
ProtoSchema writerSchema,
202207
long maxInflightRequests,
203208
long maxInflightBytes,
209+
Duration maxRetryDuration,
204210
FlowController.LimitExceededBehavior limitExceededBehavior,
205211
String traceId,
206212
BigQueryWriteClient client,
@@ -210,6 +216,7 @@ public ConnectionWorker(
210216
this.hasMessageInWaitingQueue = lock.newCondition();
211217
this.inflightReduced = lock.newCondition();
212218
this.streamName = streamName;
219+
this.maxRetryDuration = maxRetryDuration;
213220
if (writerSchema == null) {
214221
throw new StatusRuntimeException(
215222
Status.fromCode(Code.INVALID_ARGUMENT)
@@ -237,6 +244,7 @@ public void run() {
237244
}
238245

239246
private void resetConnection() {
247+
log.info("Reconnecting for stream:" + streamName);
240248
this.streamConnection =
241249
new StreamConnection(
242250
this.client,
@@ -618,6 +626,9 @@ private void requestCallback(AppendRowsResponse response) {
618626
if (conectionRetryCountWithoutCallback != 0) {
619627
conectionRetryCountWithoutCallback = 0;
620628
}
629+
if (connectionRetryStartTime != 0) {
630+
connectionRetryStartTime = 0;
631+
}
621632
if (!this.inflightRequestQueue.isEmpty()) {
622633
requestWrapper = pollInflightRequestQueue();
623634
} else if (inflightCleanuped) {
@@ -686,15 +697,25 @@ private void doneCallback(Throwable finalStatus) {
686697
try {
687698
this.streamConnectionIsConnected = false;
688699
if (connectionFinalStatus == null) {
700+
if (connectionRetryStartTime == 0) {
701+
connectionRetryStartTime = System.currentTimeMillis();
702+
}
689703
// If the error can be retried, don't set it here, let it try to retry later on.
690-
if (isRetriableError(finalStatus) && !userClosed) {
704+
if (isRetriableError(finalStatus)
705+
&& !userClosed
706+
&& (maxRetryDuration.toMillis() == 0f
707+
|| System.currentTimeMillis() - connectionRetryStartTime
708+
<= maxRetryDuration.toMillis())) {
691709
this.conectionRetryCountWithoutCallback++;
692710
log.info(
693711
"Retriable error "
694712
+ finalStatus.toString()
695713
+ " received, retry count "
696714
+ conectionRetryCountWithoutCallback
697-
+ " for stream "
715+
+ ", millis left to retry "
716+
+ (maxRetryDuration.toMillis()
717+
- (System.currentTimeMillis() - connectionRetryStartTime))
718+
+ ", for stream "
698719
+ streamName);
699720
} else {
700721
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ public class ConnectionWorkerPool {
5757
*/
5858
private final long maxInflightBytes;
5959

60+
/*
61+
* Max retry duration for retryable errors.
62+
*/
63+
private final java.time.Duration maxRetryDuration;
64+
6065
/*
6166
* Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
6267
*/
@@ -196,12 +201,14 @@ public abstract static class Builder {
196201
public ConnectionWorkerPool(
197202
long maxInflightRequests,
198203
long maxInflightBytes,
204+
java.time.Duration maxRetryDuration,
199205
FlowController.LimitExceededBehavior limitExceededBehavior,
200206
String traceId,
201207
BigQueryWriteClient client,
202208
boolean ownsBigQueryWriteClient) {
203209
this.maxInflightRequests = maxInflightRequests;
204210
this.maxInflightBytes = maxInflightBytes;
211+
this.maxRetryDuration = maxRetryDuration;
205212
this.limitExceededBehavior = limitExceededBehavior;
206213
this.traceId = traceId;
207214
this.client = client;
@@ -356,6 +363,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
356363
writeSchema,
357364
maxInflightRequests,
358365
maxInflightBytes,
366+
maxRetryDuration,
359367
limitExceededBehavior,
360368
traceId,
361369
client,

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.grpc.Status.Code;
3030
import io.grpc.StatusRuntimeException;
3131
import java.io.IOException;
32+
import java.time.Duration;
3233
import java.util.Map;
3334
import java.util.Objects;
3435
import java.util.UUID;
@@ -193,6 +194,7 @@ private StreamWriter(Builder builder) throws IOException {
193194
builder.writerSchema,
194195
builder.maxInflightRequest,
195196
builder.maxInflightBytes,
197+
builder.maxRetryDuration,
196198
builder.limitExceededBehavior,
197199
builder.traceId,
198200
getBigQueryWriteClient(builder),
@@ -251,6 +253,7 @@ private StreamWriter(Builder builder) throws IOException {
251253
return new ConnectionWorkerPool(
252254
builder.maxInflightRequest,
253255
builder.maxInflightBytes,
256+
builder.maxRetryDuration,
254257
builder.limitExceededBehavior,
255258
builder.traceId,
256259
client,
@@ -494,6 +497,8 @@ public static final class Builder {
494497

495498
private boolean enableConnectionPool = false;
496499

500+
private java.time.Duration maxRetryDuration = Duration.ofMinutes(5);
501+
497502
private Builder(String streamName) {
498503
this.streamName = Preconditions.checkNotNull(streamName);
499504
this.client = null;
@@ -602,6 +607,15 @@ public Builder setLimitExceededBehavior(
602607
return this;
603608
}
604609

610+
/*
611+
* Max duration to retry on retryable errors. Default is 5 minutes. You can allow unlimited
612+
* retry by setting the value to be 0.
613+
*/
614+
public Builder setMaxRetryDuration(java.time.Duration maxRetryDuration) {
615+
this.maxRetryDuration = maxRetryDuration;
616+
return this;
617+
}
618+
605619
/** Builds the {@code StreamWriterV2}. */
606620
public StreamWriter build() throws IOException {
607621
return new StreamWriter(this);

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ private void testSendRequestsToMultiTable(
153153
.setMaxConnectionsPerRegion(maxConnections)
154154
.build());
155155
ConnectionWorkerPool connectionWorkerPool =
156-
createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000);
156+
createConnectionWorkerPool(
157+
maxRequests, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5));
157158

158159
// Sets the sleep time to simulate requests stuck in connection.
159160
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
@@ -206,7 +207,8 @@ public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
206207
ConnectionWorkerPool.setOptions(
207208
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
208209
ConnectionWorkerPool connectionWorkerPool =
209-
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000);
210+
createConnectionWorkerPool(
211+
/*maxRequests=*/ 3, /*maxBytes=*/ 1000, java.time.Duration.ofSeconds(5));
210212

211213
// Sets the sleep time to simulate requests stuck in connection.
212214
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
@@ -255,7 +257,8 @@ public void testMultiStreamAppend_appendWhileClosing() throws Exception {
255257
ConnectionWorkerPool.setOptions(
256258
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
257259
ConnectionWorkerPool connectionWorkerPool =
258-
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000);
260+
createConnectionWorkerPool(
261+
/*maxRequests=*/ 3, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5));
259262

260263
// Sets the sleep time to simulate requests stuck in connection.
261264
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
@@ -368,11 +371,13 @@ private ProtoRows createProtoRows(String[] messages) {
368371
return rowsBuilder.build();
369372
}
370373

371-
ConnectionWorkerPool createConnectionWorkerPool(long maxRequests, long maxBytes) {
374+
ConnectionWorkerPool createConnectionWorkerPool(
375+
long maxRequests, long maxBytes, java.time.Duration maxRetryDuration) {
372376
ConnectionWorkerPool.enableTestingLogic();
373377
return new ConnectionWorkerPool(
374378
maxRequests,
375379
maxBytes,
380+
maxRetryDuration,
376381
FlowController.LimitExceededBehavior.Block,
377382
TEST_TRACE_ID,
378383
client,

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,16 +290,23 @@ private AppendRowsResponse createAppendResponse(long offset) {
290290

291291
private ConnectionWorker createConnectionWorker() throws IOException {
292292
// By default use only the first table as table reference.
293-
return createConnectionWorker(TEST_STREAM_1, TEST_TRACE_ID, 100, 1000);
293+
return createConnectionWorker(
294+
TEST_STREAM_1, TEST_TRACE_ID, 100, 1000, java.time.Duration.ofSeconds(5));
294295
}
295296

296297
private ConnectionWorker createConnectionWorker(
297-
String streamName, String traceId, long maxRequests, long maxBytes) throws IOException {
298+
String streamName,
299+
String traceId,
300+
long maxRequests,
301+
long maxBytes,
302+
java.time.Duration maxRetryDuration)
303+
throws IOException {
298304
return new ConnectionWorker(
299305
streamName,
300306
createProtoSchema("foo"),
301307
maxRequests,
302308
maxBytes,
309+
maxRetryDuration,
303310
FlowController.LimitExceededBehavior.Block,
304311
TEST_TRACE_ID,
305312
client,

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ public void setTimesToClose(long numberTimesToClose) {
9191
serviceImpl.setTimesToClose(numberTimesToClose);
9292
}
9393

94+
public void setCloseForeverAfter(long closeForeverAfter) {
95+
serviceImpl.setCloseForeverAfter(closeForeverAfter);
96+
}
97+
9498
public long getConnectionCount() {
9599
return serviceImpl.getConnectionCount();
96100
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
5757
private long closeAfter = 0;
5858
private long recordCount = 0;
5959
private long connectionCount = 0;
60+
private long closeForeverAfter = 0;
6061

6162
// Record whether the first record has been seen on a connection.
6263
private final Map<StreamObserver<AppendRowsResponse>, Boolean> connectionToFirstRequest =
@@ -177,6 +178,9 @@ public void onNext(AppendRowsRequest value) {
177178
&& (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) {
178179
LOG.info("Shutting down connection from test...");
179180
responseObserver.onError(Status.ABORTED.asException());
181+
} else if (closeForeverAfter > 0 && recordCount > closeForeverAfter) {
182+
LOG.info("Shutting down connection from test...");
183+
responseObserver.onError(Status.ABORTED.asException());
180184
} else {
181185
final Response response = responses.get(offset);
182186
sendResponse(response, responseObserver);
@@ -279,4 +283,10 @@ public void setCloseEveryNAppends(long closeAfter) {
279283
public void setTimesToClose(long numberTimesToClose) {
280284
this.numberTimesToClose = numberTimesToClose;
281285
}
286+
287+
/* The connection will forever return failure after numberTimesToClose. This option shouldn't
288+
* be used together with setCloseEveryNAppends and setTimesToClose*/
289+
public void setCloseForeverAfter(long closeForeverAfter) {
290+
this.closeForeverAfter = closeForeverAfter;
291+
}
282292
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.api.gax.core.NoCredentialsProvider;
2626
import com.google.api.gax.grpc.testing.MockGrpcService;
2727
import com.google.api.gax.grpc.testing.MockServiceHelper;
28+
import com.google.api.gax.rpc.AbortedException;
2829
import com.google.api.gax.rpc.ApiException;
2930
import com.google.api.gax.rpc.StatusCode.Code;
3031
import com.google.api.gax.rpc.UnknownException;
@@ -129,13 +130,15 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
129130
.setTraceId(TEST_TRACE_ID)
130131
.setLocation("US")
131132
.setEnableConnectionPool(true)
133+
.setMaxRetryDuration(java.time.Duration.ofSeconds(5))
132134
.build();
133135
}
134136

135137
private StreamWriter getTestStreamWriter() throws IOException {
136138
return StreamWriter.newBuilder(TEST_STREAM_1, client)
137139
.setWriterSchema(createProtoSchema())
138140
.setTraceId(TEST_TRACE_ID)
141+
.setMaxRetryDuration(java.time.Duration.ofSeconds(5))
139142
.build();
140143
}
141144

@@ -884,6 +887,48 @@ public void testAppendWithResetSuccess() throws Exception {
884887
}
885888
}
886889

890+
@Test
891+
public void testAppendWithResetNeverSuccess() throws Exception {
892+
try (StreamWriter writer = getTestStreamWriter()) {
893+
testBigQueryWrite.setCloseForeverAfter(1);
894+
long appendCount = 100;
895+
for (long i = 0; i < appendCount; i++) {
896+
testBigQueryWrite.addResponse(createAppendResponse(i));
897+
}
898+
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
899+
for (long i = 0; i < appendCount; i++) {
900+
futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}, i));
901+
}
902+
// first request succeeded.
903+
assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
904+
// after 5 seconds, the requests will bail out.
905+
for (int i = 1; i < appendCount; i++) {
906+
assertFutureException(AbortedException.class, futures.get(i));
907+
}
908+
}
909+
}
910+
911+
@Test
912+
public void testAppendWithResetNeverSuccessWithMultiplexing() throws Exception {
913+
try (StreamWriter writer = getMultiplexingTestStreamWriter()) {
914+
testBigQueryWrite.setCloseForeverAfter(1);
915+
long appendCount = 100;
916+
for (long i = 0; i < appendCount; i++) {
917+
testBigQueryWrite.addResponse(createAppendResponse(i));
918+
}
919+
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
920+
for (long i = 0; i < appendCount; i++) {
921+
futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}, i));
922+
}
923+
// first request succeeded.
924+
assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
925+
// after 5 seconds, the requests will bail out.
926+
for (int i = 1; i < appendCount; i++) {
927+
assertFutureException(AbortedException.class, futures.get(i));
928+
}
929+
}
930+
}
931+
887932
// This test is setup for the server to force a retry after all records are sent. Ensure the
888933
// records are resent, even if no new records are appeneded.
889934
@Test

0 commit comments

Comments
 (0)