Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit 4c786f1

Browse files
olavloitethiagotnunes
authored andcommitted
fix: UNAVAILABLE error on first query could cause transaction to get stuck (#807)
If the first query or read operation of a read/write transaction would return UNAVAILABLE for the first element of the result stream, the transaction could get stuck. This was caused by the internal retry mechanism that would wait for the initial attempt to return a transaction, which was never returned as the UNAVAILABLE exception was internally handled by the result stream iterator. Fixes #799
1 parent 0fd859d commit 4c786f1

5 files changed

Lines changed: 161 additions & 15 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -535,8 +535,6 @@ private ResultSet executeQueryInternal(
535535
* <li>Specific {@link QueryOptions} passed in for this query.
536536
* <li>Any value specified in a valid environment variable when the {@link SpannerOptions}
537537
* instance was created.
538-
* <li>The default {@link SpannerOptions#getDefaultQueryOptions()} specified for the database
539-
* where the query is executed.
540538
* </ol>
541539
*/
542540
@VisibleForTesting
@@ -554,7 +552,8 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
554552
return builder.build();
555553
}
556554

557-
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, QueryMode queryMode) {
555+
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
556+
Statement statement, QueryMode queryMode, boolean withTransactionSelector) {
558557
ExecuteSqlRequest.Builder builder =
559558
ExecuteSqlRequest.newBuilder()
560559
.setSql(statement.getSql())
@@ -568,9 +567,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, Query
568567
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
569568
}
570569
}
571-
TransactionSelector selector = getTransactionSelector();
572-
if (selector != null) {
573-
builder.setTransaction(selector);
570+
if (withTransactionSelector) {
571+
TransactionSelector selector = getTransactionSelector();
572+
if (selector != null) {
573+
builder.setTransaction(selector);
574+
}
574575
}
575576
builder.setSeqno(getSeqNo());
576577
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
@@ -614,18 +615,26 @@ ResultSet executeQueryInternalWithOptions(
614615
beforeReadOrQuery();
615616
final int prefetchChunks =
616617
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
618+
final ExecuteSqlRequest.Builder request =
619+
getExecuteSqlRequestBuilder(
620+
statement, queryMode, /* withTransactionSelector = */ false);
617621
ResumableStreamIterator stream =
618622
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
619623
@Override
620624
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
621625
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
622-
final ExecuteSqlRequest.Builder request =
623-
getExecuteSqlRequestBuilder(statement, queryMode);
624626
if (partitionToken != null) {
625627
request.setPartitionToken(partitionToken);
626628
}
629+
TransactionSelector selector = null;
627630
if (resumeToken != null) {
628631
request.setResumeToken(resumeToken);
632+
selector = getTransactionSelector();
633+
} else if (!request.hasTransaction()) {
634+
selector = getTransactionSelector();
635+
}
636+
if (selector != null) {
637+
request.setTransaction(selector);
629638
}
630639
SpannerRpc.StreamingCall call =
631640
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
@@ -733,10 +742,13 @@ ResultSet readInternalWithOptions(
733742
@Override
734743
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
735744
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
745+
TransactionSelector selector = null;
736746
if (resumeToken != null) {
737747
builder.setResumeToken(resumeToken);
748+
selector = getTransactionSelector();
749+
} else if (!builder.hasTransaction()) {
750+
selector = getTransactionSelector();
738751
}
739-
TransactionSelector selector = getTransactionSelector();
740752
if (selector != null) {
741753
builder.setTransaction(selector);
742754
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,7 @@ protected PartialResultSet computeNext() {
10711071
backoffSleep(context, backOff);
10721072
}
10731073
}
1074+
10741075
continue;
10751076
}
10761077
span.addAnnotation("Stream broken. Not safe to retry");

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,10 @@ public void buffer(Iterable<Mutation> mutations) {
515515
public long executeUpdate(Statement statement) {
516516
beforeReadOrQuery();
517517
final ExecuteSqlRequest.Builder builder =
518-
getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL);
518+
getExecuteSqlRequestBuilder(
519+
statement,
520+
QueryMode.NORMAL,
521+
/* withTransactionSelector = */ true);
519522
try {
520523
com.google.spanner.v1.ResultSet resultSet =
521524
rpc.executeQuery(builder.build(), session.getOptions());
@@ -538,7 +541,10 @@ public long executeUpdate(Statement statement) {
538541
public ApiFuture<Long> executeUpdateAsync(Statement statement) {
539542
beforeReadOrQuery();
540543
final ExecuteSqlRequest.Builder builder =
541-
getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL);
544+
getExecuteSqlRequestBuilder(
545+
statement,
546+
QueryMode.NORMAL,
547+
/* withTransactionSelector = */ true);
542548
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
543549
try {
544550
// Register the update as an async operation that must finish before the transaction may

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,10 @@ public void setup() {
8989
public void executeSqlRequestBuilderWithoutQueryOptions() {
9090
ExecuteSqlRequest request =
9191
context
92-
.getExecuteSqlRequestBuilder(Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL)
92+
.getExecuteSqlRequestBuilder(
93+
Statement.of("SELECT FOO FROM BAR"),
94+
QueryMode.NORMAL,
95+
true)
9396
.build();
9497
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
9598
assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions);
@@ -103,7 +106,8 @@ public void executeSqlRequestBuilderWithQueryOptions() {
103106
Statement.newBuilder("SELECT FOO FROM BAR")
104107
.withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build())
105108
.build(),
106-
QueryMode.NORMAL)
109+
QueryMode.NORMAL,
110+
true)
107111
.build();
108112
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
109113
assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0");

google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,130 @@ public Long run(TransactionContext transaction) throws Exception {
251251
assertThat(countTransactionsStarted()).isEqualTo(2);
252252
}
253253

254+
@Test
255+
public void testInlinedBeginFirstUpdateAborts() {
256+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
257+
long updateCount =
258+
client
259+
.readWriteTransaction()
260+
.run(
261+
new TransactionCallable<Long>() {
262+
boolean firstAttempt = true;
263+
264+
@Override
265+
public Long run(TransactionContext transaction) throws Exception {
266+
if (firstAttempt) {
267+
firstAttempt = false;
268+
mockSpanner.putStatementResult(
269+
StatementResult.exception(
270+
UPDATE_STATEMENT,
271+
mockSpanner.createAbortedException(
272+
ByteString.copyFromUtf8("some-tx"))));
273+
} else {
274+
mockSpanner.putStatementResult(
275+
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
276+
}
277+
return transaction.executeUpdate(UPDATE_STATEMENT);
278+
}
279+
});
280+
assertThat(updateCount).isEqualTo(UPDATE_COUNT);
281+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
282+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
283+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
284+
}
285+
286+
@Test
287+
public void testInlinedBeginFirstQueryAborts() {
288+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
289+
long updateCount =
290+
client
291+
.readWriteTransaction()
292+
.run(
293+
new TransactionCallable<Long>() {
294+
boolean firstAttempt = true;
295+
296+
@Override
297+
public Long run(TransactionContext transaction) throws Exception {
298+
if (firstAttempt) {
299+
firstAttempt = false;
300+
mockSpanner.putStatementResult(
301+
StatementResult.exception(
302+
SELECT1,
303+
mockSpanner.createAbortedException(
304+
ByteString.copyFromUtf8("some-tx"))));
305+
} else {
306+
mockSpanner.putStatementResult(
307+
StatementResult.query(SELECT1, SELECT1_RESULTSET));
308+
}
309+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
310+
while (rs.next()) {
311+
return rs.getLong(0);
312+
}
313+
}
314+
return 0L;
315+
}
316+
});
317+
assertThat(updateCount).isEqualTo(1L);
318+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
319+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
320+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
321+
}
322+
323+
@Test
324+
public void testInlinedBeginFirstQueryReturnsUnavailable() {
325+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
326+
mockSpanner.setExecuteStreamingSqlExecutionTime(
327+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
328+
long value =
329+
client
330+
.readWriteTransaction()
331+
.run(
332+
new TransactionCallable<Long>() {
333+
@Override
334+
public Long run(TransactionContext transaction) throws Exception {
335+
// The first attempt will return UNAVAILABLE and retry internally.
336+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
337+
while (rs.next()) {
338+
return rs.getLong(0);
339+
}
340+
}
341+
return 0L;
342+
}
343+
});
344+
assertThat(value).isEqualTo(1L);
345+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
346+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
347+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
348+
}
349+
350+
@Test
351+
public void testInlinedBeginFirstReadReturnsUnavailable() {
352+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
353+
mockSpanner.setStreamingReadExecutionTime(
354+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
355+
long value =
356+
client
357+
.readWriteTransaction()
358+
.run(
359+
new TransactionCallable<Long>() {
360+
@Override
361+
public Long run(TransactionContext transaction) throws Exception {
362+
// The first attempt will return UNAVAILABLE and retry internally.
363+
try (ResultSet rs =
364+
transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) {
365+
while (rs.next()) {
366+
return rs.getLong(0);
367+
}
368+
}
369+
return 0L;
370+
}
371+
});
372+
assertThat(value).isEqualTo(1L);
373+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
374+
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
375+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
376+
}
377+
254378
@Test
255379
public void testInlinedBeginTxWithQuery() {
256380
DatabaseClient client =
@@ -279,8 +403,7 @@ public Long run(TransactionContext transaction) throws Exception {
279403

280404
@Test
281405
public void testInlinedBeginTxWithRead() {
282-
DatabaseClient client =
283-
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
406+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
284407
long updateCount =
285408
client
286409
.readWriteTransaction()

0 commit comments

Comments
 (0)