3131import com .google .common .collect .Iterables ;
3232import com .google .common .collect .Sets ;
3333import com .google .datastore .v1 .ReadOptions .ReadConsistency ;
34+ import com .google .datastore .v1 .TransactionOptions ;
3435import com .google .protobuf .ByteString ;
3536import java .util .ArrayList ;
3637import java .util .Arrays ;
@@ -62,22 +63,74 @@ public Batch newBatch() {
6263 return new BatchImpl (this );
6364 }
6465
66+ @ Override
67+ public Transaction newTransaction (TransactionOptions transactionOptions ) {
68+ return new TransactionImpl (this , transactionOptions );
69+ }
70+
6571 @ Override
6672 public Transaction newTransaction () {
6773 return new TransactionImpl (this );
6874 }
6975
76+ static class ReadWriteTransactionCallable <T > implements Callable <T > {
77+ private final Datastore datastore ;
78+ private final TransactionCallable <T > callable ;
79+ private volatile TransactionOptions options ;
80+ private volatile Transaction transaction ;
81+
82+ ReadWriteTransactionCallable (Datastore datastore , TransactionCallable <T > callable , TransactionOptions options ) {
83+ this .datastore = datastore ;
84+ this .callable = callable ;
85+ this .options = options ;
86+ this .transaction = null ;
87+ }
88+
89+ Datastore getDatastore () {
90+ return datastore ;
91+ }
92+
93+ TransactionOptions getOptions () {
94+ return options ;
95+ }
96+
97+ Transaction getTransaction () {
98+ return transaction ;
99+ }
100+
101+ void setPrevTransactionId (ByteString transactionId ) {
102+ TransactionOptions .ReadWrite readWrite =
103+ TransactionOptions .ReadWrite .newBuilder ().setPreviousTransaction (transactionId ).build ();
104+ options = options .toBuilder ().setReadWrite (readWrite ).build ();
105+ }
106+
107+ @ Override
108+ public T call () throws DatastoreException {
109+ transaction = datastore .newTransaction (options );
110+ try {
111+ T value = callable .run (transaction );
112+ transaction .commit ();
113+ return value ;
114+ } catch (Exception ex ) {
115+ transaction .rollback ();
116+ throw DatastoreException .propagateUserException (ex );
117+ } finally {
118+ if (transaction .isActive ()) {
119+ transaction .rollback ();
120+ }
121+ if (options != null && options .getModeCase ().equals (TransactionOptions .ModeCase .READ_WRITE )) {
122+ setPrevTransactionId (transaction .getTransactionId ());
123+ }
124+ }
125+ }
126+ }
127+
70128 @ Override
71129 public <T > T runInTransaction (final TransactionCallable <T > callable ) {
72130 final DatastoreImpl self = this ;
73131 try {
74132 return RetryHelper .runWithRetries (
75- new Callable <T >() {
76- @ Override
77- public T call () throws DatastoreException {
78- return DatastoreHelper .runInTransaction (self , callable );
79- }
80- },
133+ new ReadWriteTransactionCallable <T >(self , callable , null ),
81134 retrySettings ,
82135 TRANSACTION_EXCEPTION_HANDLER ,
83136 getOptions ().getClock ());
@@ -86,6 +139,20 @@ public T call() throws DatastoreException {
86139 }
87140 }
88141
142+ @ Override
143+ public <T > T runInTransaction (final TransactionCallable <T > callable , TransactionOptions transactionOptions ) {
144+ final DatastoreImpl self = this ;
145+ try {
146+ return RetryHelper .runWithRetries (
147+ new ReadWriteTransactionCallable <T >(self , callable , transactionOptions ),
148+ retrySettings ,
149+ TRANSACTION_EXCEPTION_HANDLER ,
150+ getOptions ().getClock ());
151+ } catch (RetryHelperException e ) {
152+ throw DatastoreException .translateAndThrow (e );
153+ }
154+ }
155+
89156 @ Override
90157 public <T > QueryResults <T > run (Query <T > query ) {
91158 return run (null , query );
0 commit comments