@@ -898,14 +898,25 @@ public Transaction call() throws Exception {
898898 }
899899 }
900900
901- private <T extends SessionTransaction > T setActive (@ Nullable T ctx ) {
901+ TransactionContextImpl newTransaction () {
902+ TransactionContextImpl txn = new TransactionContextImpl (this , readyTransactionId , rpc ,
903+ defaultPrefetchChunks );
904+ return txn ;
905+ }
906+
907+ <T extends SessionTransaction > T setActive (@ Nullable T ctx ) {
902908 if (activeTransaction != null ) {
903909 activeTransaction .invalidate ();
904910 }
905911 activeTransaction = ctx ;
906912 readyTransactionId = null ;
907913 return ctx ;
908914 }
915+
916+ @ Override
917+ public TransactionManager transactionManager () {
918+ return new TransactionManagerImpl (this );
919+ }
909920 }
910921
911922 /**
@@ -914,7 +925,7 @@ private <T extends SessionTransaction> T setActive(@Nullable T ctx) {
914925 * transactions, and read-write transactions. The defining characteristic is that a session may
915926 * only have one such transaction active at a time.
916927 */
917- private interface SessionTransaction {
928+ static interface SessionTransaction {
918929 /** Invalidates the transaction, generally because a new one has been started on the session. */
919930 void invalidate ();
920931 }
@@ -1018,7 +1029,7 @@ ResultSet executeQueryInternalWithOptions(
10181029 ExecuteSqlRequest .newBuilder ()
10191030 .setSql (statement .getSql ())
10201031 .setQueryMode (queryMode )
1021- .setSession (session .name );
1032+ .setSession (session .getName () );
10221033 Map <String , Value > stmtParameters = statement .getParameters ();
10231034 if (!stmtParameters .isEmpty ()) {
10241035 com .google .protobuf .Struct .Builder paramsBuilder = builder .getParamsBuilder ();
@@ -1217,10 +1228,7 @@ void backoffSleep(Context context, long backoffMillis) {
12171228 this .session = session ;
12181229 this .sleeper = sleeper ;
12191230 this .span = Tracing .getTracer ().getCurrentSpan ();
1220- ByteString transactionId = session .readyTransactionId ;
1221- session .readyTransactionId = null ;
1222- this .txn = new TransactionContextImpl (session , transactionId , rpc , defaultPrefetchChunks ,
1223- span );
1231+ this .txn = session .newTransaction ();
12241232 }
12251233
12261234 TransactionRunnerImpl (SessionImpl session , SpannerRpc rpc , int defaultPrefetchChunks ) {
@@ -1230,7 +1238,7 @@ void backoffSleep(Context context, long backoffMillis) {
12301238 @ Nullable
12311239 @ Override
12321240 public <T > T run (TransactionCallable <T > callable ) {
1233- try {
1241+ try ( Scope s = tracer . withSpan ( span )) {
12341242 return runInternal (callable );
12351243 } catch (RuntimeException e ) {
12361244 TraceUtil .endSpanWithFailure (span , e );
@@ -1244,6 +1252,7 @@ private <T> T runInternal(TransactionCallable<T> callable) {
12441252 BackOff backoff = newBackOff ();
12451253 final Context context = Context .current ();
12461254 int attempt = 0 ;
1255+ // TODO: Change this to use TransactionManager.
12471256 while (true ) {
12481257 checkState (
12491258 isValid , "TransactionRunner has been invalidated by a new operation on the session" );
@@ -1318,7 +1327,7 @@ public void invalidate() {
13181327
13191328 private void backoff (Context context , BackOff backoff ) {
13201329 long delay = txn .getRetryDelayInMillis (backoff );
1321- txn = new TransactionContextImpl ( session , null , txn . rpc , txn . defaultPrefetchChunks , span );
1330+ txn = session . newTransaction ( );
13221331 span .addAnnotation ("Backing off" ,
13231332 ImmutableMap .of ("Delay" , AttributeValue .longAttributeValue (delay )));
13241333 sleeper .backoffSleep (context , delay );
@@ -1344,9 +1353,8 @@ static class TransactionContextImpl extends AbstractReadContext implements Trans
13441353 SessionImpl session ,
13451354 @ Nullable ByteString transactionId ,
13461355 SpannerRpc rpc ,
1347- int defaultPrefetchChunks ,
1348- Span span ) {
1349- super (session , rpc , defaultPrefetchChunks , span );
1356+ int defaultPrefetchChunks ) {
1357+ super (session , rpc , defaultPrefetchChunks );
13501358 this .transactionId = transactionId ;
13511359 }
13521360
0 commit comments