Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit 725ce24

Browse files
committed
fix: UNAVAILABLE error on first query could cause transaction to get stuck
If the first query or read operation of a read/write transaction would return UNAVAILABLE for the first element of the result stream, the transaction could get stuck. This was caused by the internal retry mechanism that would wait for the initial attempt to return a transaction, which was never returned as the UNAVAILABLE exception was internally handled by the result stream iterator. Fixes #799
1 parent 1a71e50 commit 725ce24

5 files changed

Lines changed: 160 additions & 13 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
555555
}
556556

557557
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
558-
Statement statement, QueryMode queryMode, Options options) {
558+
Statement statement, QueryMode queryMode, Options options, boolean withTransactionSelector) {
559559
ExecuteSqlRequest.Builder builder =
560560
ExecuteSqlRequest.newBuilder()
561561
.setSql(statement.getSql())
@@ -569,9 +569,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
569569
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
570570
}
571571
}
572-
TransactionSelector selector = getTransactionSelector();
573-
if (selector != null) {
574-
builder.setTransaction(selector);
572+
if (withTransactionSelector) {
573+
TransactionSelector selector = getTransactionSelector();
574+
if (selector != null) {
575+
builder.setTransaction(selector);
576+
}
575577
}
576578
builder.setSeqno(getSeqNo());
577579
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
@@ -616,18 +618,26 @@ ResultSet executeQueryInternalWithOptions(
616618
beforeReadOrQuery();
617619
final int prefetchChunks =
618620
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
621+
final ExecuteSqlRequest.Builder request =
622+
getExecuteSqlRequestBuilder(
623+
statement, queryMode, options, /* withTransactionSelector = */ false);
619624
ResumableStreamIterator stream =
620625
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
621626
@Override
622627
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
623628
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
624-
final ExecuteSqlRequest.Builder request =
625-
getExecuteSqlRequestBuilder(statement, queryMode, options);
626629
if (partitionToken != null) {
627630
request.setPartitionToken(partitionToken);
628631
}
632+
TransactionSelector selector = null;
629633
if (resumeToken != null) {
630634
request.setResumeToken(resumeToken);
635+
selector = getTransactionSelector();
636+
} else if (!request.hasTransaction()) {
637+
selector = getTransactionSelector();
638+
}
639+
if (selector != null) {
640+
request.setTransaction(selector);
631641
}
632642
SpannerRpc.StreamingCall call =
633643
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
@@ -735,10 +745,13 @@ ResultSet readInternalWithOptions(
735745
@Override
736746
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
737747
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
748+
TransactionSelector selector = null;
738749
if (resumeToken != null) {
739750
builder.setResumeToken(resumeToken);
751+
selector = getTransactionSelector();
752+
} else if (!builder.hasTransaction()) {
753+
selector = getTransactionSelector();
740754
}
741-
TransactionSelector selector = getTransactionSelector();
742755
if (selector != null) {
743756
builder.setTransaction(selector);
744757
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,6 +1079,7 @@ protected PartialResultSet computeNext() {
10791079
backoffSleep(context, backOff);
10801080
}
10811081
}
1082+
10821083
continue;
10831084
}
10841085
span.addAnnotation("Stream broken. Not safe to retry");

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,10 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
575575
beforeReadOrQuery();
576576
final ExecuteSqlRequest.Builder builder =
577577
getExecuteSqlRequestBuilder(
578-
statement, QueryMode.NORMAL, Options.fromUpdateOptions(options));
578+
statement,
579+
QueryMode.NORMAL,
580+
Options.fromUpdateOptions(options),
581+
/* withTransactionSelector = */ true);
579582
try {
580583
com.google.spanner.v1.ResultSet resultSet =
581584
rpc.executeQuery(builder.build(), session.getOptions());
@@ -599,7 +602,10 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
599602
beforeReadOrQuery();
600603
final ExecuteSqlRequest.Builder builder =
601604
getExecuteSqlRequestBuilder(
602-
statement, QueryMode.NORMAL, Options.fromUpdateOptions(options));
605+
statement,
606+
QueryMode.NORMAL,
607+
Options.fromUpdateOptions(options),
608+
/* withTransactionSelector = */ true);
603609
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
604610
try {
605611
// Register the update as an async operation that must finish before the transaction may

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ public void executeSqlRequestBuilderWithoutQueryOptions() {
9090
ExecuteSqlRequest request =
9191
context
9292
.getExecuteSqlRequestBuilder(
93-
Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL, Options.fromQueryOptions())
93+
Statement.of("SELECT FOO FROM BAR"),
94+
QueryMode.NORMAL,
95+
Options.fromQueryOptions(),
96+
true)
9497
.build();
9598
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
9699
assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions);
@@ -105,7 +108,8 @@ public void executeSqlRequestBuilderWithQueryOptions() {
105108
.withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build())
106109
.build(),
107110
QueryMode.NORMAL,
108-
Options.fromQueryOptions())
111+
Options.fromQueryOptions(),
112+
true)
109113
.build();
110114
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
111115
assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0");

google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,130 @@ public Long run(TransactionContext transaction) throws Exception {
255255
assertThat(countTransactionsStarted()).isEqualTo(2);
256256
}
257257

258+
@Test
259+
public void testInlinedBeginFirstUpdateAborts() {
260+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
261+
long updateCount =
262+
client
263+
.readWriteTransaction()
264+
.run(
265+
new TransactionCallable<Long>() {
266+
boolean firstAttempt = true;
267+
268+
@Override
269+
public Long run(TransactionContext transaction) throws Exception {
270+
if (firstAttempt) {
271+
firstAttempt = false;
272+
mockSpanner.putStatementResult(
273+
StatementResult.exception(
274+
UPDATE_STATEMENT,
275+
mockSpanner.createAbortedException(
276+
ByteString.copyFromUtf8("some-tx"))));
277+
} else {
278+
mockSpanner.putStatementResult(
279+
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
280+
}
281+
return transaction.executeUpdate(UPDATE_STATEMENT);
282+
}
283+
});
284+
assertThat(updateCount).isEqualTo(UPDATE_COUNT);
285+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
286+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
287+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
288+
}
289+
290+
@Test
291+
public void testInlinedBeginFirstQueryAborts() {
292+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
293+
long updateCount =
294+
client
295+
.readWriteTransaction()
296+
.run(
297+
new TransactionCallable<Long>() {
298+
boolean firstAttempt = true;
299+
300+
@Override
301+
public Long run(TransactionContext transaction) throws Exception {
302+
if (firstAttempt) {
303+
firstAttempt = false;
304+
mockSpanner.putStatementResult(
305+
StatementResult.exception(
306+
SELECT1,
307+
mockSpanner.createAbortedException(
308+
ByteString.copyFromUtf8("some-tx"))));
309+
} else {
310+
mockSpanner.putStatementResult(
311+
StatementResult.query(SELECT1, SELECT1_RESULTSET));
312+
}
313+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
314+
while (rs.next()) {
315+
return rs.getLong(0);
316+
}
317+
}
318+
return 0L;
319+
}
320+
});
321+
assertThat(updateCount).isEqualTo(1L);
322+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
323+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
324+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
325+
}
326+
327+
@Test
328+
public void testInlinedBeginFirstQueryReturnsUnavailable() {
329+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
330+
mockSpanner.setExecuteStreamingSqlExecutionTime(
331+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
332+
long value =
333+
client
334+
.readWriteTransaction()
335+
.run(
336+
new TransactionCallable<Long>() {
337+
@Override
338+
public Long run(TransactionContext transaction) throws Exception {
339+
// The first attempt will return UNAVAILABLE and retry internally.
340+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
341+
while (rs.next()) {
342+
return rs.getLong(0);
343+
}
344+
}
345+
return 0L;
346+
}
347+
});
348+
assertThat(value).isEqualTo(1L);
349+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
350+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
351+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
352+
}
353+
354+
@Test
355+
public void testInlinedBeginFirstReadReturnsUnavailable() {
356+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
357+
mockSpanner.setStreamingReadExecutionTime(
358+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
359+
long value =
360+
client
361+
.readWriteTransaction()
362+
.run(
363+
new TransactionCallable<Long>() {
364+
@Override
365+
public Long run(TransactionContext transaction) throws Exception {
366+
// The first attempt will return UNAVAILABLE and retry internally.
367+
try (ResultSet rs =
368+
transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) {
369+
while (rs.next()) {
370+
return rs.getLong(0);
371+
}
372+
}
373+
return 0L;
374+
}
375+
});
376+
assertThat(value).isEqualTo(1L);
377+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
378+
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
379+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
380+
}
381+
258382
@Test
259383
public void testInlinedBeginTxWithQuery() {
260384
DatabaseClient client =
@@ -283,8 +407,7 @@ public Long run(TransactionContext transaction) throws Exception {
283407

284408
@Test
285409
public void testInlinedBeginTxWithRead() {
286-
DatabaseClient client =
287-
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
410+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
288411
long updateCount =
289412
client
290413
.readWriteTransaction()

0 commit comments

Comments
 (0)