Skip to content

Commit 35db0fb

Browse files
fix: catch uncaught exception from append loop and add expoential retry to reconnection (#2015)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream * fix: Add precision truncation to the passed in value from JSON float and double type. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify the bom version * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix deadlockissue in ConnectionWorkerPool * fix: fix deadlock issue during close + append for multiplexing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: fix one potential root cause of deadlock issue for non-multiplexing case * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add timeout to inflight queue waiting, and also add some extra log * feat: allow java client lib handle switch table schema for the same stream name * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: close before retry connection * fix: close before retry connection * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 66db8fe commit 35db0fb

2 files changed

Lines changed: 114 additions & 0 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ class ConnectionWorker implements AutoCloseable {
198198
*/
199199
private final String writerId = UUID.randomUUID().toString();
200200

201+
/*
202+
* Test only exception behavior testing params.
203+
*/
204+
private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null;
205+
private long testOnlyAppendLoopSleepTime = 0;
206+
201207
/** The maximum size of one request. Defined by the API. */
202208
public static long getApiMaxRequestBytes() {
203209
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
@@ -240,6 +246,25 @@ public void run() {
240246
appendLoop();
241247
}
242248
});
249+
appendThread.setUncaughtExceptionHandler(
250+
(Thread t, Throwable e) -> {
251+
log.warning(
252+
"Exception thrown from append loop, thus stream writer is shutdown due to exception: "
253+
+ e.toString());
254+
e.printStackTrace();
255+
lock.lock();
256+
try {
257+
connectionFinalStatus = e;
258+
// Move all current waiting requests to in flight queue.
259+
while (!this.waitingRequestQueue.isEmpty()) {
260+
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
261+
this.inflightRequestQueue.addLast(requestWrapper);
262+
}
263+
} finally {
264+
lock.unlock();
265+
}
266+
cleanupInflightRequests();
267+
});
243268
this.appendThread.start();
244269
}
245270

@@ -249,6 +274,8 @@ private void resetConnection() {
249274
// It's safe to directly close the previous connection as the in flight messages
250275
// will be picked up by the next connection.
251276
this.streamConnection.close();
277+
Uninterruptibles.sleepUninterruptibly(
278+
calculateSleepTimeMilli(conectionRetryCountWithoutCallback), TimeUnit.MILLISECONDS);
252279
}
253280
this.streamConnection =
254281
new StreamConnection(
@@ -391,6 +418,22 @@ private void maybeWaitForInflightQuota() {
391418
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000);
392419
}
393420

421+
@VisibleForTesting
422+
static long calculateSleepTimeMilli(long retryCount) {
423+
return Math.min((long) Math.pow(2, retryCount), 60000);
424+
}
425+
426+
@VisibleForTesting
427+
void setTestOnlyAppendLoopSleepTime(long testOnlyAppendLoopSleepTime) {
428+
this.testOnlyAppendLoopSleepTime = testOnlyAppendLoopSleepTime;
429+
}
430+
431+
@VisibleForTesting
432+
void setTestOnlyRunTimeExceptionInAppendLoop(
433+
RuntimeException testOnlyRunTimeExceptionInAppendLoop) {
434+
this.testOnlyRunTimeExceptionInAppendLoop = testOnlyRunTimeExceptionInAppendLoop;
435+
}
436+
394437
public long getInflightWaitSeconds() {
395438
return inflightWaitSec.longValue();
396439
}
@@ -524,6 +567,10 @@ private void appendLoop() {
524567
} finally {
525568
lock.unlock();
526569
}
570+
if (testOnlyRunTimeExceptionInAppendLoop != null) {
571+
Uninterruptibles.sleepUninterruptibly(testOnlyAppendLoopSleepTime, TimeUnit.MILLISECONDS);
572+
throw testOnlyRunTimeExceptionInAppendLoop;
573+
}
527574
resetConnection();
528575
// Set firstRequestInConnection to indicate the next request to be sent should include
529576
// metedata. Reset everytime after reconnection.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Arrays;
3939
import java.util.List;
4040
import java.util.UUID;
41+
import java.util.concurrent.ExecutionException;
4142
import org.junit.Before;
4243
import org.junit.Test;
4344
import org.junit.runner.RunWith;
@@ -351,6 +352,72 @@ public void testAppendButInflightQueueFull() throws Exception {
351352
}
352353
}
353354

355+
@Test
356+
public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
357+
ProtoSchema schema1 = createProtoSchema("foo");
358+
StreamWriter sw1 =
359+
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
360+
ConnectionWorker connectionWorker =
361+
new ConnectionWorker(
362+
TEST_STREAM_1,
363+
createProtoSchema("foo"),
364+
100000,
365+
100000,
366+
Duration.ofSeconds(100),
367+
FlowController.LimitExceededBehavior.Block,
368+
TEST_TRACE_ID,
369+
client.getSettings());
370+
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
371+
ConnectionWorker.setMaxInflightQueueWaitTime(500);
372+
373+
long appendCount = 10;
374+
for (int i = 0; i < appendCount; i++) {
375+
testBigQueryWrite.addResponse(createAppendResponse(i));
376+
}
377+
connectionWorker.setTestOnlyRunTimeExceptionInAppendLoop(
378+
new RuntimeException("Any exception can happen."));
379+
// Sleep 1 second before erroring out.
380+
connectionWorker.setTestOnlyAppendLoopSleepTime(1000L);
381+
382+
// In total insert 5 requests,
383+
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
384+
for (int i = 0; i < appendCount; i++) {
385+
futures.add(
386+
sendTestMessage(
387+
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
388+
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1);
389+
}
390+
391+
for (int i = 0; i < appendCount; i++) {
392+
int finalI = i;
393+
ExecutionException ex =
394+
assertThrows(
395+
ExecutionException.class,
396+
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
397+
assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen.");
398+
}
399+
400+
// The future append will directly fail.
401+
ExecutionException ex =
402+
assertThrows(
403+
ExecutionException.class,
404+
() ->
405+
sendTestMessage(
406+
connectionWorker,
407+
sw1,
408+
createFooProtoRows(new String[] {String.valueOf(100)}),
409+
100)
410+
.get());
411+
assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen.");
412+
}
413+
414+
@Test
415+
public void testExponentialBackoff() throws Exception {
416+
assertThat(ConnectionWorker.calculateSleepTimeMilli(0)).isEqualTo(1);
417+
assertThat(ConnectionWorker.calculateSleepTimeMilli(5)).isEqualTo(32);
418+
assertThat(ConnectionWorker.calculateSleepTimeMilli(100)).isEqualTo(60000);
419+
}
420+
354421
private AppendRowsResponse createAppendResponse(long offset) {
355422
return AppendRowsResponse.newBuilder()
356423
.setAppendResult(

0 commit comments

Comments
 (0)