Skip to content

Commit 223267a

Browse files
committed
---
yaml --- r: 27255 b: refs/heads/pubsub-ordering-keys c: 97ba17c h: refs/heads/master i: 27253: eefc969 27251: 33d39e3 27247: 39a72d2
1 parent 24640f6 commit 223267a

2 files changed

Lines changed: 11 additions & 23 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: dd8a5cef1c2a6038210b87c6e5a581000cb14409
158+
refs/heads/pubsub-ordering-keys: 97ba17c86f930ff5226d2da4fffa26a183b03d65
159159
"refs/heads/update_mvn_badge": ae2d773814db0f71197ccf5a8612ee1d8056f8de
160160
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
161161
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import java.util.concurrent.CancellationException;
3131
import java.util.concurrent.ConcurrentLinkedDeque;
3232
import java.util.concurrent.Executor;
33-
import java.util.logging.Level;
34-
import java.util.logging.Logger;
3533

3634
interface CancellableRunnable extends Runnable {
3735
void cancel(Throwable e);
@@ -52,19 +50,19 @@ private SequentialExecutorService() {}
5250
* tasks with the same key sequentially. Tasks with the same key will be run only when its
5351
* predecessor has been completed while tasks with different keys can be run in parallel.
5452
*/
55-
private abstract static class SequentialExecutor {
53+
private abstract static class SequentialExecutor<R extends Runnable> {
5654
// Maps keys to tasks.
57-
protected final Map<String, Deque<Runnable>> tasksByKey;
55+
protected final Map<String, Deque<R>> tasksByKey;
5856
protected final Executor executor;
5957

6058
private SequentialExecutor(Executor executor) {
6159
this.executor = executor;
6260
this.tasksByKey = new HashMap<>();
6361
}
6462

65-
protected void execute(final String key, Runnable task) {
63+
protected void execute(final String key, R task) {
6664
synchronized (tasksByKey) {
67-
Deque<Runnable> newTasks = tasksByKey.get(key);
65+
Deque<R> newTasks = tasksByKey.get(key);
6866
// If this key is already being handled, add it to the queue and return.
6967
if (newTasks != null) {
7068
newTasks.add(task);
@@ -84,7 +82,7 @@ protected void callNextTaskAsync(final String key) {
8482
new Runnable() {
8583
@Override
8684
public void run() {
87-
Deque<Runnable> tasks;
85+
Deque<R> tasks;
8886
synchronized (tasksByKey) {
8987
tasks = tasksByKey.get(key);
9088
if (tasks != null && tasks.isEmpty()) {
@@ -94,7 +92,7 @@ public void run() {
9492
}
9593
if (tasks != null) {
9694
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
97-
Runnable task = tasks.poll();
95+
R task = tasks.poll();
9896
if (task != null) {
9997
task.run();
10098
}
@@ -105,7 +103,7 @@ public void run() {
105103
}
106104

107105
@BetaApi
108-
static class AutoExecutor extends SequentialExecutor {
106+
static class AutoExecutor extends SequentialExecutor<Runnable> {
109107
AutoExecutor(Executor executor) {
110108
super(executor);
111109
}
@@ -126,10 +124,7 @@ void submit(final String key, final Runnable task) {
126124
* fails, other tasks with the same key that have not been executed will be cancelled.
127125
*/
128126
@BetaApi
129-
static class CallbackExecutor extends SequentialExecutor {
130-
private static final Logger logger =
131-
Logger.getLogger(SequentialExecutorService.SequentialExecutor.class.getName());
132-
127+
static class CallbackExecutor extends SequentialExecutor<CancellableRunnable> {
133128
CallbackExecutor(Executor executor) {
134129
super(executor);
135130
}
@@ -226,19 +221,12 @@ private void cancelQueuedTasks(final String key, Throwable e) {
226221
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
227222
// so that no more tasks are scheduled.
228223
synchronized (tasksByKey) {
229-
final Deque<Runnable> tasks = tasksByKey.get(key);
224+
final Deque<CancellableRunnable> tasks = tasksByKey.get(key);
230225
if (tasks == null) {
231226
return;
232227
}
233228
while (!tasks.isEmpty()) {
234-
Runnable task = tasks.poll();
235-
if (task instanceof CancellableRunnable) {
236-
((CancellableRunnable) task).cancel(e);
237-
} else {
238-
logger.log(
239-
Level.WARNING,
240-
"Attempted to cancel Runnable that was not CancellableRunnable; ignored.");
241-
}
229+
tasks.poll().cancel(e);
242230
}
243231
}
244232
}

0 commit comments

Comments
 (0)