Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 86dfc3a

Browse files
authored
fix: remove reconnection feature from client library (#849)
* fix: Remove reconnection logic * . * . * . * . * . * . * . * .
1 parent 687d48a commit 86dfc3a

3 files changed

Lines changed: 54 additions & 171 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java

Lines changed: 21 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@
6565
* without offset, please use a simpler writer {@code DirectWriter}.
6666
*
6767
* <p>A {@link StreamWrier} provides built-in capabilities to: handle batching of messages;
68-
* controlling memory utilization (through flow control); automatic connection re-establishment and
69-
* request cleanup (only keeps write schema on first request in the stream).
68+
* controlling memory utilization (through flow control) and request cleanup (only keeps write
69+
* schema on first request in the stream).
7070
*
7171
* <p>With customizable options that control:
7272
*
@@ -863,14 +863,20 @@ public void onStart(StreamController controller) {
863863

864864
private void abortInflightRequests(Throwable t) {
865865
synchronized (this.inflightBatches) {
866+
boolean first_error = true;
866867
while (!this.inflightBatches.isEmpty()) {
867868
InflightBatch inflightBatch = this.inflightBatches.poll();
868-
inflightBatch.onFailure(
869-
new AbortedException(
870-
"Request aborted due to previous failures",
871-
t,
872-
GrpcStatusCode.of(Status.Code.ABORTED),
873-
true));
869+
if (first_error) {
870+
inflightBatch.onFailure(t);
871+
first_error = false;
872+
} else {
873+
inflightBatch.onFailure(
874+
new AbortedException(
875+
"Request aborted due to previous failures",
876+
t,
877+
GrpcStatusCode.of(Status.Code.ABORTED),
878+
true));
879+
}
874880
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
875881
}
876882
}
@@ -913,7 +919,12 @@ public void onResponse(AppendRowsResponse response) {
913919
response.getAppendResult().getOffset().getValue(),
914920
inflightBatch.getExpectedOffset()));
915921
inflightBatch.onFailure(exception);
916-
abortInflightRequests(exception);
922+
abortInflightRequests(
923+
new AbortedException(
924+
"Request aborted due to previous failures",
925+
exception,
926+
GrpcStatusCode.of(Status.Code.ABORTED),
927+
true));
917928
} else {
918929
inflightBatch.onSuccess(response);
919930
}
@@ -931,56 +942,7 @@ public void onComplete() {
931942
@Override
932943
public void onError(Throwable t) {
933944
LOG.fine("OnError called");
934-
if (streamWriter.shutdown.get()) {
935-
abortInflightRequests(t);
936-
return;
937-
}
938-
InflightBatch inflightBatch = null;
939-
synchronized (this.inflightBatches) {
940-
if (inflightBatches.isEmpty()) {
941-
// The batches could have been aborted.
942-
return;
943-
}
944-
inflightBatch = this.inflightBatches.poll();
945-
}
946-
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
947-
if (isRecoverableError(t)) {
948-
try {
949-
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
950-
&& !streamWriter.shutdown.get()) {
951-
synchronized (streamWriter.currentRetries) {
952-
streamWriter.currentRetries++;
953-
}
954-
LOG.info(
955-
"Try to reestablish connection due to transient error: "
956-
+ t.toString()
957-
+ " retry times: "
958-
+ streamWriter.currentRetries);
959-
streamWriter.refreshAppend();
960-
LOG.info("Resending requests on after connection established");
961-
streamWriter.writeBatch(inflightBatch);
962-
} else {
963-
inflightBatch.onFailure(t);
964-
abortInflightRequests(t);
965-
synchronized (streamWriter.currentRetries) {
966-
streamWriter.currentRetries = 0;
967-
}
968-
}
969-
} catch (InterruptedException e) {
970-
LOG.info("Got exception while retrying: " + e.toString());
971-
inflightBatch.onFailure(new StatusRuntimeException(Status.ABORTED));
972-
abortInflightRequests(new StatusRuntimeException(Status.ABORTED));
973-
synchronized (streamWriter.currentRetries) {
974-
streamWriter.currentRetries = 0;
975-
}
976-
}
977-
} else {
978-
inflightBatch.onFailure(t);
979-
abortInflightRequests(t);
980-
synchronized (streamWriter.currentRetries) {
981-
streamWriter.currentRetries = 0;
982-
}
983-
}
945+
abortInflightRequests(t);
984946
}
985947
};
986948

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,16 @@ private void notifyNextAcquires() {
6464
}
6565
}
6666

67-
public synchronized void release(long messageSize) {
67+
public synchronized void release(long messageSize) throws IllegalStateException {
6868
lock.lock();
6969
--pendingCount;
70+
if (pendingCount < 0) {
71+
throw new IllegalStateException("pendingCount cannot be less than 0");
72+
}
7073
pendingSize -= messageSize;
74+
if (pendingSize < 0) {
75+
throw new IllegalStateException("pendingSize cannot be less than 0");
76+
}
7177
notifyNextAcquires();
7278
lock.unlock();
7379
notifyAll();

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java

Lines changed: 26 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import com.google.api.gax.grpc.testing.LocalChannelProvider;
3333
import com.google.api.gax.grpc.testing.MockGrpcService;
3434
import com.google.api.gax.grpc.testing.MockServiceHelper;
35-
import com.google.api.gax.retrying.RetrySettings;
35+
import com.google.api.gax.rpc.AbortedException;
3636
import com.google.api.gax.rpc.DataLossException;
3737
import com.google.cloud.bigquery.storage.test.Test.FooType;
3838
import com.google.common.base.Strings;
@@ -510,42 +510,6 @@ public void testFlowControlBehaviorException() throws Exception {
510510
}
511511
}
512512

513-
@Test
514-
public void testStreamReconnectionTransient() throws Exception {
515-
StreamWriter writer =
516-
getTestStreamWriterBuilder()
517-
.setBatchingSettings(
518-
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
519-
.toBuilder()
520-
.setDelayThreshold(Duration.ofSeconds(100000))
521-
.setElementCountThreshold(1L)
522-
.setFlowControlSettings(
523-
StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
524-
.toBuilder()
525-
.setMaxOutstandingElementCount(1L)
526-
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
527-
.build())
528-
.build())
529-
.build();
530-
531-
testBigQueryWrite.addResponse(
532-
AppendRowsResponse.newBuilder()
533-
.setAppendResult(
534-
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
535-
.build());
536-
testBigQueryWrite.addException(new StatusRuntimeException(Status.UNAVAILABLE));
537-
testBigQueryWrite.addResponse(
538-
AppendRowsResponse.newBuilder()
539-
.setAppendResult(
540-
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
541-
.build());
542-
ApiFuture<AppendRowsResponse> future1 = sendTestMessage(writer, new String[] {"m1"});
543-
ApiFuture<AppendRowsResponse> future2 = sendTestMessage(writer, new String[] {"m1"});
544-
assertEquals(0L, future1.get().getAppendResult().getOffset().getValue());
545-
assertEquals(1L, future2.get().getAppendResult().getOffset().getValue());
546-
writer.close();
547-
}
548-
549513
@Test
550514
public void testStreamReconnectionPermanant() throws Exception {
551515
StreamWriter writer =
@@ -569,36 +533,6 @@ public void testStreamReconnectionPermanant() throws Exception {
569533
writer.close();
570534
}
571535

572-
@Test
573-
public void testStreamReconnectionExceedRetry() throws Exception {
574-
StreamWriter writer =
575-
getTestStreamWriterBuilder()
576-
.setBatchingSettings(
577-
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
578-
.toBuilder()
579-
.setDelayThreshold(Duration.ofSeconds(100000))
580-
.setElementCountThreshold(1L)
581-
.build())
582-
.setRetrySettings(
583-
RetrySettings.newBuilder()
584-
.setMaxRetryDelay(Duration.ofMillis(100))
585-
.setMaxAttempts(1)
586-
.build())
587-
.build();
588-
assertEquals(1, writer.getRetrySettings().getMaxAttempts());
589-
StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
590-
testBigQueryWrite.addException(transientError);
591-
testBigQueryWrite.addException(transientError);
592-
ApiFuture<AppendRowsResponse> future3 = sendTestMessage(writer, new String[] {"toomanyretry"});
593-
try {
594-
future3.get();
595-
Assert.fail("This should fail.");
596-
} catch (ExecutionException e) {
597-
assertEquals(transientError.toString(), e.getCause().getCause().toString());
598-
}
599-
writer.close();
600-
}
601-
602536
@Test
603537
public void testOffset() throws Exception {
604538
try (StreamWriter writer =
@@ -665,7 +599,7 @@ public void testOffsetMismatch() throws Exception {
665599

666600
@Test
667601
public void testErrorPropagation() throws Exception {
668-
try (StreamWriter writer =
602+
StreamWriter writer =
669603
getTestStreamWriterBuilder()
670604
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
671605
.setBatchingSettings(
@@ -674,13 +608,23 @@ public void testErrorPropagation() throws Exception {
674608
.setElementCountThreshold(1L)
675609
.setDelayThreshold(Duration.ofSeconds(5))
676610
.build())
677-
.build()) {
678-
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
679-
sendTestMessage(writer, new String[] {"A"}).get();
611+
.build();
612+
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
613+
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
614+
ApiFuture<AppendRowsResponse> future1 = sendTestMessage(writer, new String[] {"A"});
615+
ApiFuture<AppendRowsResponse> future2 = sendTestMessage(writer, new String[] {"B"});
616+
try {
617+
future1.get();
680618
fail("should throw exception");
681619
} catch (ExecutionException e) {
682620
assertThat(e.getCause()).isInstanceOf(DataLossException.class);
683621
}
622+
try {
623+
future2.get();
624+
fail("should throw exception");
625+
} catch (ExecutionException e) {
626+
assertThat(e.getCause()).isInstanceOf(AbortedException.class);
627+
}
684628
}
685629

686630
@Test
@@ -957,43 +901,6 @@ public void testFlushAll() throws Exception {
957901
writer.close();
958902
}
959903

960-
@Test
961-
public void testFlushAllFailed() throws Exception {
962-
StreamWriter writer =
963-
getTestStreamWriterBuilder()
964-
.setBatchingSettings(
965-
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
966-
.toBuilder()
967-
.setElementCountThreshold(2L)
968-
.setDelayThreshold(Duration.ofSeconds(100000))
969-
.build())
970-
.build();
971-
972-
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
973-
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
974-
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
975-
976-
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
977-
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
978-
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});
979-
980-
assertFalse(appendFuture3.isDone());
981-
try {
982-
writer.flushAll(100000);
983-
fail("Should have thrown an Exception");
984-
} catch (Exception expected) {
985-
if (expected.getCause() instanceof com.google.api.gax.rpc.DataLossException) {
986-
LOG.info("got: " + expected.toString());
987-
} else {
988-
fail("Unexpected exception:" + expected.toString());
989-
}
990-
}
991-
992-
assertTrue(appendFuture3.isDone());
993-
994-
writer.close();
995-
}
996-
997904
@Test
998905
public void testDatasetTraceId() throws Exception {
999906
StreamWriter writer =
@@ -1032,10 +939,12 @@ public void testShutdownWithConnectionError() throws Exception {
1032939
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
1033940
.build());
1034941
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
942+
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
1035943
testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));
1036944

1037945
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
1038946
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
947+
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"B"});
1039948
Thread.sleep(5000L);
1040949
// Move the needle for responses to be sent.
1041950
fakeExecutor.advanceTime(Duration.ofSeconds(20));
@@ -1044,9 +953,15 @@ public void testShutdownWithConnectionError() throws Exception {
1044953
assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue());
1045954
try {
1046955
appendFuture2.get();
1047-
fail("Should fail with exception");
1048-
} catch (java.util.concurrent.ExecutionException e) {
1049-
assertEquals("Request aborted due to previous failures", e.getCause().getMessage());
956+
fail("Should fail with exception future2");
957+
} catch (ExecutionException e) {
958+
assertThat(e.getCause()).isInstanceOf(DataLossException.class);
959+
}
960+
try {
961+
appendFuture3.get();
962+
fail("Should fail with exception future3");
963+
} catch (ExecutionException e) {
964+
assertThat(e.getCause()).isInstanceOf(AbortedException.class);
1050965
}
1051966
}
1052967
}

0 commit comments

Comments
 (0)