Skip to content

Commit 85268dd

Browse files
committed
update error codes
1 parent 7535015 commit 85268dd

1 file changed

Lines changed: 12 additions & 5 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.services.bigquery.model.TableSchema;
2424
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
25+
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
2526
import com.google.cloud.bigquery.storage.v1.ProtoRows;
2627
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
2728
import com.google.protobuf.Descriptors.Descriptor;
@@ -422,12 +423,15 @@ public String toString() {
422423
// The first context is always the one that fails.
423424
AppendRowsContext failedContext =
424425
Preconditions.checkNotNull(Iterables.getFirst(failedContexts, null));
425-
Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
426426
// Invalidate the StreamWriter and force a new one to be created.
427427
LOG.error(
428428
"Got error " + failedContext.getError() + " closing " + failedContext.streamName);
429429
clearClients.accept(contexts);
430430
appendFailures.inc();
431+
432+
boolean explicitStreamFinalized =
433+
failedContext.getError() instanceof StreamFinalizedException;
434+
Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
431435
// This means that the offset we have stored does not match the current end of
432436
// the stream in the Storage API. Usually this happens because a crash or a bundle
433437
// failure
@@ -438,10 +442,13 @@ public String toString() {
438442
boolean offsetMismatch =
439443
statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS);
440444
// This implies that the stream doesn't exist or has already been finalized. In this
441-
// case we have no
442-
// choice but to create a new stream.
443-
boolean streamDoesntExist = statusCode.equals(Code.INVALID_ARGUMENT);
444-
if (offsetMismatch || streamDoesntExist) {
445+
// case we have no choice but to create a new stream.
446+
boolean streamDoesNotExist =
447+
explicitStreamFinalized
448+
|| statusCode.equals(Code.INVALID_ARGUMENT)
449+
|| statusCode.equals(Code.NOT_FOUND)
450+
|| statusCode.equals(Code.FAILED_PRECONDITION);
451+
if (offsetMismatch || streamDoesNotExist) {
445452
appendOffsetFailures.inc();
446453
LOG.warn(
447454
"Append to "

0 commit comments

Comments
 (0)