2929import java .util .Queue ;
3030import java .util .concurrent .Callable ;
3131import java .util .concurrent .CancellationException ;
32- import java .util .concurrent .ConcurrentLinkedQueue ;
3332import java .util .concurrent .Executor ;
3433
3534interface CancellableRunnable extends Runnable {
@@ -89,20 +88,22 @@ protected void callNextTaskAsync(final String key) {
8988 }
9089 }
9190 if (executeTask ) {
92- executor .execute (new Runnable () {
93- @ Override public void run () {
94- R task = null ;
95- synchronized (tasksByKey ) {
96- Queue <R > tasks = tasksByKey .get (key );
97- if (tasks != null && !tasks .isEmpty ()) {
98- task = tasks .poll ();
91+ executor .execute (
92+ new Runnable () {
93+ @ Override
94+ public void run () {
95+ R task = null ;
96+ synchronized (tasksByKey ) {
97+ Queue <R > tasks = tasksByKey .get (key );
98+ if (tasks != null && !tasks .isEmpty ()) {
99+ task = tasks .poll ();
100+ }
101+ }
102+ if (task != null ) {
103+ task .run ();
104+ }
99105 }
100- }
101- if (task != null ) {
102- task .run ();
103- }
104- }
105- });
106+ });
106107 }
107108 }
108109 }
@@ -115,15 +116,18 @@ static class AutoExecutor extends SequentialExecutor<Runnable> {
115116
116117 /** Runs synchronous {@code Runnable} tasks sequentially. */
117118 void submit (final String key , final Runnable task ) {
118- super .execute (key , new Runnable () {
119- @ Override public void run () {
120- try {
121- task .run ();
122- } finally {
123- callNextTaskAsync (key );
124- }
125- }
126- });
119+ super .execute (
120+ key ,
121+ new Runnable () {
122+ @ Override
123+ public void run () {
124+ try {
125+ task .run ();
126+ } finally {
127+ callNextTaskAsync (key );
128+ }
129+ }
130+ });
127131 }
128132 }
129133
@@ -133,8 +137,9 @@ void submit(final String key, final Runnable task) {
133137 */
134138 @ BetaApi
135139 static class CallbackExecutor extends SequentialExecutor <CancellableRunnable > {
136- static CancellationException CANCELLATION_EXCEPTION = new CancellationException (
137- "Execution cancelled because executing previous runnable failed." );
140+ static CancellationException CANCELLATION_EXCEPTION =
141+ new CancellationException (
142+ "Execution cancelled because executing previous runnable failed." );
138143
139144 CallbackExecutor (Executor executor ) {
140145 super (executor );
@@ -177,41 +182,47 @@ <T> ApiFuture<T> submit(final String key, final Callable<ApiFuture<T>> callable)
177182
178183 // Step 2: create the CancellableRunnable
179184 // Step 3: add the task to queue via `execute`
180- CancellableRunnable task = new CancellableRunnable () {
181- private boolean cancelled = false ;
182-
183- @ Override public void run () {
184- // the task was cancelled
185- if (cancelled ) {
186- return ;
187- }
188-
189- try {
190- // Step 4: call the `Callable`
191- ApiFutureCallback <T > callback = new ApiFutureCallback <T >() {
192- // Step 5.1: on success
193- @ Override public void onSuccess (T msg ) {
194- future .set (msg );
195- callNextTaskAsync (key );
185+ CancellableRunnable task =
186+ new CancellableRunnable () {
187+ private boolean cancelled = false ;
188+
189+ @ Override
190+ public void run () {
191+ // the task was cancelled
192+ if (cancelled ) {
193+ return ;
196194 }
197195
198- // Step 5.2: on failure
199- @ Override public void onFailure (Throwable e ) {
200- future .setException (e );
201- cancelQueuedTasks (key , CANCELLATION_EXCEPTION );
196+ try {
197+ // Step 4: call the `Callable`
198+ ApiFutureCallback <T > callback =
199+ new ApiFutureCallback <T >() {
200+ // Step 5.1: on success
201+ @ Override
202+ public void onSuccess (T msg ) {
203+ future .set (msg );
204+ callNextTaskAsync (key );
205+ }
206+
207+ // Step 5.2: on failure
208+ @ Override
209+ public void onFailure (Throwable e ) {
210+ future .setException (e );
211+ cancelQueuedTasks (key , CANCELLATION_EXCEPTION );
212+ }
213+ };
214+ ApiFutures .addCallback (callable .call (), callback , directExecutor ());
215+ } catch (Exception e ) {
216+ cancel (e );
202217 }
203- };
204- ApiFutures .addCallback (callable .call (), callback , directExecutor ());
205- } catch (Exception e ) {
206- cancel (e );
207- }
208- }
218+ }
209219
210- @ Override public void cancel (Throwable e ) {
211- this .cancelled = true ;
212- future .setException (e );
213- }
214- };
220+ @ Override
221+ public void cancel (Throwable e ) {
222+ this .cancelled = true ;
223+ future .setException (e );
224+ }
225+ };
215226 execute (key , task );
216227 return future ;
217228 }
0 commit comments