Skip to content

Commit 91da88b

Browse files
fix: add client shutdown if request waiting in request queue for too long. (#2017)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream * fix: Add precision truncation to the passed in value from JSON float and double type. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify the bom version * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix deadlockissue in ConnectionWorkerPool * fix: fix deadlock issue during close + append for multiplexing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: fix one potential root cause of deadlock issue for non-multiplexing case * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add timeout to inflight queue waiting, and also add some extra log * feat: allow java client lib handle switch table schema for the same stream name * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: close before retry connection * fix: close before retry connection * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add client side timeout if inflight request wait too long --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent de00447 commit 91da88b

2 files changed

Lines changed: 150 additions & 4 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.grpc.StatusRuntimeException;
3434
import java.io.IOException;
3535
import java.time.Duration;
36+
import java.time.Instant;
3637
import java.util.Comparator;
3738
import java.util.Deque;
3839
import java.util.HashMap;
@@ -66,6 +67,14 @@ class ConnectionWorker implements AutoCloseable {
6667
// Maximum wait time on inflight quota before error out.
6768
private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000;
6869

70+
/*
71+
* Maximum time waiting for request callback before shutting down the connection.
72+
*
73+
* We will constantly checking how much time we have been waiting for the next request callback
74+
* if we wait too much time we will start shutting down the connections and clean up the queues.
75+
*/
76+
private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);
77+
6978
private Lock lock;
7079
private Condition hasMessageInWaitingQueue;
7180
private Condition inflightReduced;
@@ -273,7 +282,6 @@ public void run() {
273282
log.warning(
274283
"Exception thrown from append loop, thus stream writer is shutdown due to exception: "
275284
+ e.toString());
276-
e.printStackTrace();
277285
lock.lock();
278286
try {
279287
connectionFinalStatus = e;
@@ -507,7 +515,7 @@ public void close() {
507515
} finally {
508516
this.lock.unlock();
509517
}
510-
log.fine("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId);
518+
log.info("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId);
511519
try {
512520
appendThread.join();
513521
} catch (InterruptedException e) {
@@ -525,6 +533,7 @@ public void close() {
525533
// Backend request has a 2 minute timeout, so wait a little longer than that.
526534
this.client.awaitTermination(150, TimeUnit.SECONDS);
527535
} catch (InterruptedException ignored) {
536+
log.warning("Client await termination timeout in writer id " + writerId);
528537
}
529538

530539
try {
@@ -569,6 +578,11 @@ private void appendLoop() {
569578
this.lock.lock();
570579
try {
571580
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
581+
// Check whether we should error out the current append loop.
582+
if (inflightRequestQueue.size() > 0) {
583+
throwIfWaitCallbackTooLong(inflightRequestQueue.getFirst().requestCreationTimeStamp);
584+
}
585+
572586
// Copy the streamConnectionIsConnected guarded by lock to a local variable.
573587
// In addition, only reconnect if there is a retriable error.
574588
streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null;
@@ -583,6 +597,7 @@ private void appendLoop() {
583597
}
584598
while (!this.waitingRequestQueue.isEmpty()) {
585599
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
600+
requestWrapper.trySetRequestInsertQueueTime();
586601
this.inflightRequestQueue.addLast(requestWrapper);
587602
localQueue.addLast(requestWrapper);
588603
}
@@ -703,6 +718,17 @@ private void appendLoop() {
703718
log.info("Append thread is done. Stream: " + streamName + " id: " + writerId);
704719
}
705720

721+
private void throwIfWaitCallbackTooLong(Instant timeToCheck) {
722+
Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now());
723+
if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) {
724+
throw new RuntimeException(
725+
String.format(
726+
"Request has waited in inflight queue for %sms for writer %s, "
727+
+ "which is over maximum wait time %s",
728+
milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME.toString()));
729+
}
730+
}
731+
706732
/*
707733
* Returns true if waiting queue is drain, a.k.a. no more requests in the waiting queue.
708734
*
@@ -740,6 +766,7 @@ private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
740766
}
741767
this.lock.lock();
742768
try {
769+
log.warning("Donecallback is not triggered within timeout frame for writer " + writerId);
743770
if (connectionFinalStatus == null) {
744771
connectionFinalStatus =
745772
new StatusRuntimeException(
@@ -883,7 +910,7 @@ private boolean isConnectionErrorRetriable(Throwable t) {
883910
}
884911

885912
private void doneCallback(Throwable finalStatus) {
886-
log.fine(
913+
log.info(
887914
"Received done callback. Stream: "
888915
+ streamName
889916
+ " worker id: "
@@ -923,7 +950,9 @@ private void doneCallback(Throwable finalStatus) {
923950
"Connection finished with error "
924951
+ finalStatus.toString()
925952
+ " for stream "
926-
+ streamName);
953+
+ streamName
954+
+ " with write id: "
955+
+ writerId);
927956
}
928957
}
929958
} finally {
@@ -955,12 +984,21 @@ static final class AppendRequestAndResponse {
955984
// The writer that issues the call of the request.
956985
final StreamWriter streamWriter;
957986

987+
Instant requestCreationTimeStamp;
988+
958989
AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) {
959990
this.appendResult = SettableApiFuture.create();
960991
this.message = message;
961992
this.messageSize = message.getProtoRows().getSerializedSize();
962993
this.streamWriter = streamWriter;
963994
}
995+
996+
void trySetRequestInsertQueueTime() {
997+
// Only set the first time the caller tries to set the timestamp.
998+
if (requestCreationTimeStamp == null) {
999+
requestCreationTimeStamp = Instant.now();
1000+
}
1001+
}
9641002
}
9651003

9661004
/** Returns the current workload of this worker. */
@@ -1051,6 +1089,11 @@ static void setMaxInflightQueueWaitTime(long waitTime) {
10511089
INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = waitTime;
10521090
}
10531091

1092+
@VisibleForTesting
1093+
static void setMaxInflightRequestWaitTime(Duration waitTime) {
1094+
MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime;
1095+
}
1096+
10541097
@AutoValue
10551098
abstract static class TableSchemaAndTimestamp {
10561099

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class ConnectionWorkerTest {
6161
public void setUp() throws Exception {
6262
testBigQueryWrite = new FakeBigQueryWrite();
6363
ConnectionWorker.setMaxInflightQueueWaitTime(300000);
64+
ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofMinutes(10));
6465
serviceHelper =
6566
new MockServiceHelper(
6667
UUID.randomUUID().toString(), Arrays.<MockGrpcService>asList(testBigQueryWrite));
@@ -607,4 +608,106 @@ public void testLoadIsOverWhelmed() {
607608
Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100);
608609
assertThat(load2.isOverwhelmed()).isFalse();
609610
}
611+
612+
@Test
613+
public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
614+
ProtoSchema schema1 = createProtoSchema("foo");
615+
ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofSeconds(1));
616+
StreamWriter sw1 =
617+
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
618+
ConnectionWorker connectionWorker =
619+
new ConnectionWorker(
620+
TEST_STREAM_1,
621+
null,
622+
createProtoSchema("foo"),
623+
100000,
624+
100000,
625+
Duration.ofSeconds(100),
626+
FlowController.LimitExceededBehavior.Block,
627+
TEST_TRACE_ID,
628+
client.getSettings());
629+
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));
630+
631+
long appendCount = 10;
632+
for (int i = 0; i < appendCount; i++) {
633+
testBigQueryWrite.addResponse(createAppendResponse(i));
634+
}
635+
636+
// In total insert 5 requests,
637+
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
638+
for (int i = 0; i < appendCount; i++) {
639+
futures.add(
640+
sendTestMessage(
641+
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
642+
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1);
643+
}
644+
645+
for (int i = 0; i < appendCount; i++) {
646+
int finalI = i;
647+
ExecutionException ex =
648+
assertThrows(
649+
ExecutionException.class,
650+
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
651+
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
652+
}
653+
654+
// The future append will directly fail.
655+
ExecutionException ex =
656+
assertThrows(
657+
ExecutionException.class,
658+
() ->
659+
sendTestMessage(
660+
connectionWorker,
661+
sw1,
662+
createFooProtoRows(new String[] {String.valueOf(100)}),
663+
100)
664+
.get());
665+
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
666+
}
667+
668+
@Test
669+
public void testLongTimeIdleWontFail() throws Exception {
670+
ProtoSchema schema1 = createProtoSchema("foo");
671+
ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofSeconds(1));
672+
StreamWriter sw1 =
673+
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
674+
ConnectionWorker connectionWorker =
675+
new ConnectionWorker(
676+
TEST_STREAM_1,
677+
null,
678+
createProtoSchema("foo"),
679+
100000,
680+
100000,
681+
Duration.ofSeconds(100),
682+
FlowController.LimitExceededBehavior.Block,
683+
TEST_TRACE_ID,
684+
client.getSettings());
685+
686+
long appendCount = 10;
687+
for (int i = 0; i < appendCount * 2; i++) {
688+
testBigQueryWrite.addResponse(createAppendResponse(i));
689+
}
690+
691+
// In total insert 5 requests,
692+
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
693+
for (int i = 0; i < appendCount; i++) {
694+
futures.add(
695+
sendTestMessage(
696+
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
697+
}
698+
// Sleep 2 seconds to make sure request queue is empty.
699+
Thread.sleep(2000);
700+
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 0);
701+
for (int i = 0; i < appendCount; i++) {
702+
futures.add(
703+
sendTestMessage(
704+
connectionWorker,
705+
sw1,
706+
createFooProtoRows(new String[] {String.valueOf(i)}),
707+
i + appendCount));
708+
}
709+
for (int i = 0; i < appendCount * 2; i++) {
710+
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
711+
}
712+
}
610713
}

0 commit comments

Comments
 (0)