3030import java .util .concurrent .CancellationException ;
3131import java .util .concurrent .ConcurrentLinkedDeque ;
3232import java .util .concurrent .Executor ;
33- import java .util .logging .Level ;
34- import java .util .logging .Logger ;
3533
3634interface CancellableRunnable extends Runnable {
3735 void cancel (Throwable e );
@@ -52,19 +50,19 @@ private SequentialExecutorService() {}
5250 * tasks with the same key sequentially. Tasks with the same key will be run only when its
5351 * predecessor has been completed while tasks with different keys can be run in parallel.
5452 */
55- private abstract static class SequentialExecutor {
53+ private abstract static class SequentialExecutor < R extends Runnable > {
5654 // Maps keys to tasks.
57- protected final Map <String , Deque <Runnable >> tasksByKey ;
55+ protected final Map <String , Deque <R >> tasksByKey ;
5856 protected final Executor executor ;
5957
6058 private SequentialExecutor (Executor executor ) {
6159 this .executor = executor ;
6260 this .tasksByKey = new HashMap <>();
6361 }
6462
65- protected void execute (final String key , Runnable task ) {
63+ protected void execute (final String key , R task ) {
6664 synchronized (tasksByKey ) {
67- Deque <Runnable > newTasks = tasksByKey .get (key );
65+ Deque <R > newTasks = tasksByKey .get (key );
6866 // If this key is already being handled, add it to the queue and return.
6967 if (newTasks != null ) {
7068 newTasks .add (task );
@@ -84,7 +82,7 @@ protected void callNextTaskAsync(final String key) {
8482 new Runnable () {
8583 @ Override
8684 public void run () {
87- Deque <Runnable > tasks ;
85+ Deque <R > tasks ;
8886 synchronized (tasksByKey ) {
8987 tasks = tasksByKey .get (key );
9088 if (tasks != null && tasks .isEmpty ()) {
@@ -94,7 +92,7 @@ public void run() {
9492 }
9593 if (tasks != null ) {
9694 // TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
97- Runnable task = tasks .poll ();
95+ R task = tasks .poll ();
9896 if (task != null ) {
9997 task .run ();
10098 }
@@ -105,7 +103,7 @@ public void run() {
105103 }
106104
107105 @ BetaApi
108- static class AutoExecutor extends SequentialExecutor {
106+ static class AutoExecutor extends SequentialExecutor < Runnable > {
109107 AutoExecutor (Executor executor ) {
110108 super (executor );
111109 }
@@ -126,10 +124,7 @@ void submit(final String key, final Runnable task) {
126124 * fails, other tasks with the same key that have not been executed will be cancelled.
127125 */
128126 @ BetaApi
129- static class CallbackExecutor extends SequentialExecutor {
130- private static final Logger logger =
131- Logger .getLogger (SequentialExecutorService .SequentialExecutor .class .getName ());
132-
127+ static class CallbackExecutor extends SequentialExecutor <CancellableRunnable > {
133128 CallbackExecutor (Executor executor ) {
134129 super (executor );
135130 }
@@ -226,19 +221,12 @@ private void cancelQueuedTasks(final String key, Throwable e) {
226221 // TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
227222 // so that no more tasks are scheduled.
228223 synchronized (tasksByKey ) {
229- final Deque <Runnable > tasks = tasksByKey .get (key );
224+ final Deque <CancellableRunnable > tasks = tasksByKey .get (key );
230225 if (tasks == null ) {
231226 return ;
232227 }
233228 while (!tasks .isEmpty ()) {
234- Runnable task = tasks .poll ();
235- if (task instanceof CancellableRunnable ) {
236- ((CancellableRunnable ) task ).cancel (e );
237- } else {
238- logger .log (
239- Level .WARNING ,
240- "Attempted to cancel Runnable that was not CancellableRunnable; ignored." );
241- }
229+ tasks .poll ().cancel (e );
242230 }
243231 }
244232 }
0 commit comments