Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit f2b3a60

Browse files
committed
test: add option to track the statement that starts a transaction
1 parent c8ef46f commit f2b3a60

7 files changed

Lines changed: 98 additions & 2 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ TransactionContextImpl newTransaction(Options options) {
348348
.setSession(this)
349349
.setTransactionId(readyTransactionId)
350350
.setOptions(options)
351+
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
351352
.setRpc(spanner.getRpc())
352353
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
353354
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
102102
private final DatabaseAdminStubSettings databaseAdminStubSettings;
103103
private final Duration partitionedDmlTimeout;
104104
private final boolean autoThrottleAdministrativeRequests;
105+
private final boolean trackTransactionStarter;
105106
/**
106107
* These are the default {@link QueryOptions} defined by the user on this {@link SpannerOptions}.
107108
*/
@@ -555,6 +556,7 @@ private SpannerOptions(Builder builder) {
555556
}
556557
partitionedDmlTimeout = builder.partitionedDmlTimeout;
557558
autoThrottleAdministrativeRequests = builder.autoThrottleAdministrativeRequests;
559+
trackTransactionStarter = builder.trackTransactionStarter;
558560
defaultQueryOptions = builder.defaultQueryOptions;
559561
envQueryOptions = builder.getEnvironmentQueryOptions();
560562
if (envQueryOptions.equals(QueryOptions.getDefaultInstance())) {
@@ -632,6 +634,7 @@ public static class Builder
632634
DatabaseAdminStubSettings.newBuilder();
633635
private Duration partitionedDmlTimeout = Duration.ofHours(2L);
634636
private boolean autoThrottleAdministrativeRequests = false;
637+
private boolean trackTransactionStarter = false;
635638
private Map<DatabaseId, QueryOptions> defaultQueryOptions = new HashMap<>();
636639
private CallCredentialsProvider callCredentialsProvider;
637640
private CloseableExecutorProvider asyncExecutorProvider;
@@ -678,6 +681,7 @@ private Builder() {
678681
this.databaseAdminStubSettingsBuilder = options.databaseAdminStubSettings.toBuilder();
679682
this.partitionedDmlTimeout = options.partitionedDmlTimeout;
680683
this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests;
684+
this.trackTransactionStarter = options.trackTransactionStarter;
681685
this.defaultQueryOptions = options.defaultQueryOptions;
682686
this.callCredentialsProvider = options.callCredentialsProvider;
683687
this.asyncExecutorProvider = options.asyncExecutorProvider;
@@ -889,6 +893,21 @@ public Builder setAutoThrottleAdministrativeRequests() {
889893
return this;
890894
}
891895

896+
/**
897+
* Instructs the client library to track the first request of each read/write transaction. This
898+
* statement will include a BeginTransaction option and will return a transaction id as part of
899+
* its result. All other statements in the same transaction must wait for this first statement
900+
* to finish before they can proceed. By setting this option the client library will throw a
901+
* {@link SpannerException} with {@link ErrorCode#DEADLINE_EXCEEDED} for any subsequent
902+
* statement that has waited for at least 60 seconds for the first statement to return a
903+
* transaction id, including the stacktrace of the initial statement that should have returned a
904+
* transaction id.
905+
*/
906+
public Builder setTrackTransactionStarter() {
907+
this.trackTransactionStarter = true;
908+
return this;
909+
}
910+
892911
/**
893912
* Sets the default {@link QueryOptions} that will be used for all queries on the specified
894913
* database. Query options can also be specified on a per-query basis and as environment
@@ -1081,6 +1100,10 @@ public boolean isAutoThrottleAdministrativeRequests() {
10811100
return autoThrottleAdministrativeRequests;
10821101
}
10831102

1103+
public boolean isTrackTransactionStarter() {
1104+
return trackTransactionStarter;
1105+
}
1106+
10841107
public CallCredentialsProvider getCallCredentialsProvider() {
10851108
return callCredentialsProvider;
10861109
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import java.util.concurrent.Callable;
6161
import java.util.concurrent.ExecutionException;
6262
import java.util.concurrent.Executor;
63+
import java.util.concurrent.TimeUnit;
64+
import java.util.concurrent.TimeoutException;
6365
import java.util.concurrent.atomic.AtomicInteger;
6466
import java.util.logging.Level;
6567
import java.util.logging.Logger;
@@ -76,6 +78,7 @@ static class TransactionContextImpl extends AbstractReadContext implements Trans
7678
static class Builder extends AbstractReadContext.Builder<Builder, TransactionContextImpl> {
7779
private ByteString transactionId;
7880
private Options options;
81+
private boolean trackTransactionStarter;
7982

8083
private Builder() {}
8184

@@ -89,6 +92,11 @@ Builder setOptions(Options options) {
8992
return self();
9093
}
9194

95+
Builder setTrackTransactionStarter(boolean trackTransactionStarter) {
96+
this.trackTransactionStarter = trackTransactionStarter;
97+
return self();
98+
}
99+
92100
@Override
93101
TransactionContextImpl build() {
94102
Preconditions.checkState(this.options != null, "Options must be set");
@@ -170,13 +178,18 @@ public void removeListener(Runnable listener) {
170178
*/
171179
private volatile SettableApiFuture<ByteString> transactionIdFuture = null;
172180

181+
@VisibleForTesting long waitForTransactionTimeoutMillis = 60_000L;
182+
private final boolean trackTransactionStarter;
183+
private Exception transactionStarter;
184+
173185
volatile ByteString transactionId;
174186

175187
private Timestamp commitTimestamp;
176188

177189
private TransactionContextImpl(Builder builder) {
178190
super(builder);
179191
this.transactionId = builder.transactionId;
192+
this.trackTransactionStarter = builder.trackTransactionStarter;
180193
this.options = builder.options;
181194
this.finishedAsyncOperations.set(null);
182195
}
@@ -432,6 +445,9 @@ TransactionSelector getTransactionSelector() {
432445
// transactionIdFuture until an actual transactionId is available.
433446
if (transactionIdFuture == null) {
434447
transactionIdFuture = SettableApiFuture.create();
448+
if (trackTransactionStarter) {
449+
transactionStarter = new Exception("Requesting new transaction");
450+
}
435451
} else {
436452
tx = transactionIdFuture;
437453
}
@@ -447,7 +463,13 @@ TransactionSelector getTransactionSelector() {
447463
// Aborted error if the call that included the BeginTransaction option fails. The
448464
// Aborted error will cause the entire transaction to be retried, and the retry will use
449465
// a separate BeginTransaction RPC.
450-
TransactionSelector.newBuilder().setId(tx.get()).build();
466+
if (trackTransactionStarter) {
467+
TransactionSelector.newBuilder()
468+
.setId(tx.get(waitForTransactionTimeoutMillis, TimeUnit.MILLISECONDS))
469+
.build();
470+
} else {
471+
TransactionSelector.newBuilder().setId(tx.get()).build();
472+
}
451473
}
452474
} catch (ExecutionException e) {
453475
if (e.getCause() instanceof AbortedException) {
@@ -456,6 +478,17 @@ TransactionSelector getTransactionSelector() {
456478
}
457479
}
458480
throw SpannerExceptionFactory.newSpannerException(e.getCause());
481+
} catch (TimeoutException e) {
482+
SpannerException se =
483+
SpannerExceptionFactory.newSpannerException(
484+
ErrorCode.DEADLINE_EXCEEDED,
485+
"Timeout while waiting for a transaction to be returned by another statement. "
486+
+ "See the suppressed exception for the stacktrace of the caller that should return a transaction",
487+
e);
488+
if (transactionStarter != null) {
489+
se.addSuppressed(transactionStarter);
490+
}
491+
throw se;
459492
} catch (InterruptedException e) {
460493
throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e);
461494
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/testing/RemoteSpannerHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public static RemoteSpannerHelper create(InstanceId instanceId) throws Throwable
146146
SpannerOptions.newBuilder()
147147
.setProjectId(instanceId.getProject())
148148
.setAutoThrottleAdministrativeRequests()
149+
.setTrackTransactionStarter()
149150
.build();
150151
Spanner client = options.getService();
151152
return new RemoteSpannerHelper(options, instanceId, client);

google-cloud-spanner/src/test/java/com/google/cloud/spanner/GceTestEnvConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ public GceTestEnvConfig() {
6464
boolean attemptDirectPath = Boolean.getBoolean(ATTEMPT_DIRECT_PATH);
6565
String directPathTestScenario = System.getProperty(DIRECT_PATH_TEST_SCENARIO, "");
6666
SpannerOptions.Builder builder =
67-
SpannerOptions.newBuilder().setAutoThrottleAdministrativeRequests();
67+
SpannerOptions.newBuilder()
68+
.setAutoThrottleAdministrativeRequests()
69+
.setTrackTransactionStarter();
6870
if (!projectId.isEmpty()) {
6971
builder.setProjectId(projectId);
7072
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
3737
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
3838
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
39+
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
40+
import com.google.common.base.Throwables;
3941
import com.google.common.collect.ImmutableList;
4042
import com.google.common.util.concurrent.MoreExecutors;
4143
import com.google.protobuf.AbstractMessage;
@@ -193,6 +195,7 @@ public void setUp() throws IOException {
193195
.setProjectId("[PROJECT]")
194196
.setChannelProvider(channelProvider)
195197
.setCredentials(NoCredentials.getInstance())
198+
.setTrackTransactionStarter()
196199
.build()
197200
.getService();
198201
}
@@ -1313,6 +1316,38 @@ public Void run(TransactionContext transaction) throws Exception {
13131316
assertThat(request2.getResumeToken()).isNotEqualTo(ByteString.EMPTY);
13141317
}
13151318

1319+
@Test
1320+
public void testWaitForTransactionTimeout() {
1321+
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
1322+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1323+
try {
1324+
client
1325+
.readWriteTransaction()
1326+
.run(
1327+
new TransactionCallable<Void>() {
1328+
@Override
1329+
public Void run(TransactionContext transaction) throws Exception {
1330+
TransactionContextImpl impl = (TransactionContextImpl) transaction;
1331+
impl.waitForTransactionTimeoutMillis = 1L;
1332+
transaction.executeUpdateAsync(UPDATE_STATEMENT);
1333+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
1334+
while (rs.next()) {}
1335+
}
1336+
return null;
1337+
}
1338+
});
1339+
fail("missing expected exception");
1340+
} catch (SpannerException e) {
1341+
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
1342+
assertThat(e.getSuppressed()).hasLength(1);
1343+
assertThat(Throwables.getStackTraceAsString(e.getSuppressed()[0]))
1344+
.contains("TransactionContextImpl.executeUpdateAsync");
1345+
}
1346+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
1347+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1);
1348+
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
1349+
}
1350+
13161351
private int countRequests(Class<? extends AbstractMessage> requestType) {
13171352
int count = 0;
13181353
for (AbstractMessage msg : mockSpanner.getRequests()) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ public void inlineBegin() {
311311
when(spanner.getRpc()).thenReturn(rpc);
312312
when(spanner.getDefaultQueryOptions(Mockito.any(DatabaseId.class)))
313313
.thenReturn(QueryOptions.getDefaultInstance());
314+
when(spanner.getOptions()).thenReturn(mock(SpannerOptions.class));
314315
SessionImpl session =
315316
new SessionImpl(
316317
spanner, "projects/p/instances/i/databases/d/sessions/s", Collections.EMPTY_MAP) {

0 commit comments

Comments
 (0)