Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 6134818

Browse files
authored
feat: Add a flushAll() method that will flush all the inflight request and make sure all responses returned (#492)
* feat: Add a flushAll method that will make sure all the request in the pipe are sent and responded * . * . * . * . * Add timeout * Add timeout fix * . * . * . * Lock flush and append operations. * make sure lock is released * remove timeout test since it is flaky in public runs * remove timeout test since it is flaky in public runs * fix lint * remove a test * .
1 parent 89f3fca commit 6134818

3 files changed

Lines changed: 151 additions & 35 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java

Lines changed: 80 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ public class StreamWriter implements AutoCloseable {
9999
private final Lock appendAndRefreshAppendLock;
100100
private final MessagesBatch messagesBatch;
101101

102+
// Indicates if a stream has some non recoverable exception happened.
103+
private final Lock exceptionLock;
104+
private Throwable streamException;
105+
102106
private BackgroundResource backgroundResources;
103107
private List<BackgroundResource> backgroundResourceList;
104108

@@ -145,10 +149,13 @@ private StreamWriter(Builder builder)
145149

146150
this.batchingSettings = builder.batchingSettings;
147151
this.retrySettings = builder.retrySettings;
148-
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName);
152+
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this);
149153
messagesBatchLock = new ReentrantLock();
150154
appendAndRefreshAppendLock = new ReentrantLock();
151155
activeAlarm = new AtomicBoolean(false);
156+
this.exceptionLock = new ReentrantLock();
157+
this.streamException = null;
158+
152159
executor = builder.executorProvider.getExecutor();
153160
backgroundResourceList = new ArrayList<>();
154161
if (builder.executorProvider.shouldAutoClose()) {
@@ -212,6 +219,14 @@ public Boolean expired() {
212219
return createTime.plus(streamTTL).compareTo(Instant.now()) < 0;
213220
}
214221

222+
private void setException(Throwable t) {
223+
exceptionLock.lock();
224+
if (this.streamException == null) {
225+
this.streamException = t;
226+
}
227+
exceptionLock.unlock();
228+
}
229+
215230
/**
216231
* Schedules the writing of a message. The write of the message may occur immediately or be
217232
* delayed based on the writer batching options.
@@ -265,6 +280,33 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
265280
return outstandingAppend.appendResult;
266281
}
267282

283+
/**
284+
* This is the general flush method for asynchronise append operation. When you have outstanding
285+
* append requests, calling flush will make sure all outstanding append requests completed and
286+
* successful. Otherwise there will be an exception thrown.
287+
*
288+
* @throws Exception
289+
*/
290+
public void flushAll(long timeoutMillis) throws Exception {
291+
appendAndRefreshAppendLock.lock();
292+
try {
293+
writeAllOutstanding();
294+
synchronized (messagesWaiter) {
295+
messagesWaiter.waitComplete(timeoutMillis);
296+
}
297+
} finally {
298+
appendAndRefreshAppendLock.unlock();
299+
}
300+
exceptionLock.lock();
301+
try {
302+
if (streamException != null) {
303+
throw new Exception(streamException);
304+
}
305+
} finally {
306+
exceptionLock.unlock();
307+
}
308+
}
309+
268310
/**
269311
* Flush the rows on a BUFFERED stream, up to the specified offset. After flush, rows will be
270312
* available for read. If no exception is thrown, it means the flush happened.
@@ -411,14 +453,15 @@ private static final class InflightBatch {
411453
private long expectedOffset;
412454
private Boolean attachSchema;
413455
private String streamName;
414-
415456
private final AtomicBoolean failed;
457+
private final StreamWriter streamWriter;
416458

417459
InflightBatch(
418460
List<AppendRequestAndFutureResponse> inflightRequests,
419461
long batchSizeBytes,
420462
String streamName,
421-
Boolean attachSchema) {
463+
Boolean attachSchema,
464+
StreamWriter streamWriter) {
422465
this.inflightRequests = inflightRequests;
423466
this.offsetList = new ArrayList<Long>(inflightRequests.size());
424467
for (AppendRequestAndFutureResponse request : inflightRequests) {
@@ -435,6 +478,7 @@ private static final class InflightBatch {
435478
this.attachSchema = attachSchema;
436479
this.streamName = streamName;
437480
this.failed = new AtomicBoolean(false);
481+
this.streamWriter = streamWriter;
438482
}
439483

440484
int count() {
@@ -482,7 +526,9 @@ private void onFailure(Throwable t) {
482526
return;
483527
} else {
484528
LOG.info("Setting " + t.toString() + " on response");
529+
this.streamWriter.setException(t);
485530
}
531+
486532
for (AppendRequestAndFutureResponse request : inflightRequests) {
487533
request.appendResult.setException(t);
488534
}
@@ -552,8 +598,12 @@ protected void shutdown() {
552598
currentAlarmFuture.cancel(false);
553599
}
554600
writeAllOutstanding();
555-
synchronized (messagesWaiter) {
556-
messagesWaiter.waitComplete();
601+
try {
602+
synchronized (messagesWaiter) {
603+
messagesWaiter.waitComplete(0);
604+
}
605+
} catch (InterruptedException e) {
606+
LOG.warning("Failed to wait for messages to return " + e.toString());
557607
}
558608
if (clientStream.isSendReady()) {
559609
clientStream.closeSend();
@@ -820,14 +870,14 @@ public void onStart(StreamController controller) {
820870
private void abortInflightRequests(Throwable t) {
821871
synchronized (this.inflightBatches) {
822872
while (!this.inflightBatches.isEmpty()) {
823-
this.inflightBatches
824-
.poll()
825-
.onFailure(
826-
new AbortedException(
827-
"Request aborted due to previous failures",
828-
t,
829-
GrpcStatusCode.of(Status.Code.ABORTED),
830-
true));
873+
InflightBatch inflightBatch = this.inflightBatches.poll();
874+
inflightBatch.onFailure(
875+
new AbortedException(
876+
"Request aborted due to previous failures",
877+
t,
878+
GrpcStatusCode.of(Status.Code.ABORTED),
879+
true));
880+
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
831881
}
832882
}
833883
}
@@ -850,13 +900,15 @@ public void onResponse(AppendRowsResponse response) {
850900
streamWriter.getOnSchemaUpdateRunnable(), 0L, TimeUnit.MILLISECONDS);
851901
}
852902
}
853-
// TODO: Deal with in stream errors.
903+
// Currently there is nothing retryable. If the error is already exists, then ignore it.
854904
if (response.hasError()) {
855-
StatusRuntimeException exception =
856-
new StatusRuntimeException(
857-
Status.fromCodeValue(response.getError().getCode())
858-
.withDescription(response.getError().getMessage()));
859-
inflightBatch.onFailure(exception);
905+
if (response.getError().getCode() != 6 /* ALREADY_EXISTS */) {
906+
StatusRuntimeException exception =
907+
new StatusRuntimeException(
908+
Status.fromCodeValue(response.getError().getCode())
909+
.withDescription(response.getError().getMessage()));
910+
inflightBatch.onFailure(exception);
911+
}
860912
}
861913
if (inflightBatch.getExpectedOffset() > 0
862914
&& response.getOffset() != inflightBatch.getExpectedOffset()) {
@@ -907,30 +959,25 @@ public void onError(Throwable t) {
907959
}
908960
} else {
909961
inflightBatch.onFailure(t);
962+
abortInflightRequests(t);
910963
synchronized (streamWriter.currentRetries) {
911964
streamWriter.currentRetries = 0;
912965
}
913966
}
914967
} catch (IOException | InterruptedException e) {
915968
LOG.info("Got exception while retrying.");
916969
inflightBatch.onFailure(e);
970+
abortInflightRequests(e);
917971
synchronized (streamWriter.currentRetries) {
918972
streamWriter.currentRetries = 0;
919973
}
920974
}
921975
} else {
922976
inflightBatch.onFailure(t);
977+
abortInflightRequests(t);
923978
synchronized (streamWriter.currentRetries) {
924979
streamWriter.currentRetries = 0;
925980
}
926-
try {
927-
if (!streamWriter.shutdown.get()) {
928-
// Establish a new connection.
929-
streamWriter.refreshAppend();
930-
}
931-
} catch (IOException | InterruptedException e) {
932-
LOG.info("Failed to establish a new connection");
933-
}
934981
}
935982
} finally {
936983
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
@@ -945,17 +992,21 @@ private static class MessagesBatch {
945992
private final BatchingSettings batchingSettings;
946993
private Boolean attachSchema = true;
947994
private final String streamName;
995+
private final StreamWriter streamWriter;
948996

949-
private MessagesBatch(BatchingSettings batchingSettings, String streamName) {
997+
private MessagesBatch(
998+
BatchingSettings batchingSettings, String streamName, StreamWriter streamWriter) {
950999
this.batchingSettings = batchingSettings;
9511000
this.streamName = streamName;
1001+
this.streamWriter = streamWriter;
9521002
reset();
9531003
}
9541004

9551005
// Get all the messages out in a batch.
9561006
private InflightBatch popBatch() {
9571007
InflightBatch batch =
958-
new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema);
1008+
new InflightBatch(
1009+
messages, batchedBytes, this.streamName, this.attachSchema, this.streamWriter);
9591010
this.attachSchema = false;
9601011
reset();
9611012
return batch;

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,20 +146,22 @@ public void acquire(long messageSize) throws FlowController.FlowControlException
146146
}
147147
}
148148

149-
public synchronized void waitComplete() {
149+
public synchronized void waitComplete(long timeoutMillis) throws InterruptedException {
150+
long end = System.currentTimeMillis() + timeoutMillis;
150151
lock.lock();
151152
try {
152-
while (pendingCount > 0) {
153+
while (pendingCount > 0 && (timeoutMillis == 0 || end > System.currentTimeMillis())) {
153154
lock.unlock();
154155
try {
155-
wait();
156+
wait(timeoutMillis == 0 ? 0 : end - System.currentTimeMillis());
156157
} catch (InterruptedException e) {
157-
LOG.warning("Interrupted while waiting for completion");
158+
throw e;
158159
}
159160
lock.lock();
160161
}
161-
} catch (Exception e) {
162-
LOG.warning(e.toString());
162+
if (pendingCount > 0) {
163+
throw new InterruptedException("Wait timeout");
164+
}
163165
} finally {
164166
lock.unlock();
165167
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,4 +834,67 @@ public void testExistingClient() throws Exception {
834834
client.shutdown();
835835
client.awaitTermination(1, TimeUnit.MINUTES);
836836
}
837+
838+
@Test
839+
public void testFlushAll() throws Exception {
840+
StreamWriter writer =
841+
getTestStreamWriterBuilder()
842+
.setBatchingSettings(
843+
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
844+
.toBuilder()
845+
.setElementCountThreshold(2L)
846+
.setDelayThreshold(Duration.ofSeconds(100000))
847+
.build())
848+
.build();
849+
850+
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
851+
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
852+
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());
853+
854+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
855+
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
856+
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});
857+
858+
assertFalse(appendFuture3.isDone());
859+
writer.flushAll(100000);
860+
861+
assertTrue(appendFuture3.isDone());
862+
863+
writer.close();
864+
}
865+
866+
@Test
867+
public void testFlushAllFailed() throws Exception {
868+
StreamWriter writer =
869+
getTestStreamWriterBuilder()
870+
.setBatchingSettings(
871+
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
872+
.toBuilder()
873+
.setElementCountThreshold(2L)
874+
.setDelayThreshold(Duration.ofSeconds(100000))
875+
.build())
876+
.build();
877+
878+
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
879+
880+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
881+
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
882+
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});
883+
884+
assertFalse(appendFuture3.isDone());
885+
try {
886+
writer.flushAll(100000);
887+
fail("Should have thrown an Exception");
888+
} catch (Exception expected) {
889+
if (expected.getCause() instanceof com.google.api.gax.rpc.DataLossException) {
890+
LOG.info("got: " + expected.toString());
891+
} else {
892+
fail("Unexpected exception:" + expected.toString());
893+
}
894+
}
895+
896+
assertTrue(appendFuture3.isDone());
897+
898+
writer.close();
899+
}
837900
}

0 commit comments

Comments
 (0)