Skip to content

Commit d5cb520

Browse files
authored
---
yaml --- r: 27237 b: refs/heads/pubsub-ordering-keys c: 6be9e3d h: refs/heads/master i: 27235: 1b23581
1 parent e127220 commit d5cb520

2 files changed

Lines changed: 39 additions & 39 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: 7f96c1a0e5257639e61ac513ee06d7d6179cd8a7
158+
refs/heads/pubsub-ordering-keys: 6be9e3dd09d797dc60025f54fa376cb9625ce159
159159
"refs/heads/update_mvn_badge": ae2d773814db0f71197ccf5a8612ee1d8056f8de
160160
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
161161
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -101,35 +101,29 @@ void execute(final String key, Runnable task) {
101101

102102
protected abstract void execute(String key, Deque<Runnable> finalTasks);
103103

104-
/** Cancels every task in the queue assoicated with {@code key}. */
105-
void cancelQueuedTasks(final String key, Throwable e) {
106-
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
107-
// so that no more tasks are scheduled.
108-
synchronized (tasksByKey) {
109-
final Deque<Runnable> tasks = tasksByKey.get(key);
110-
if (tasks == null) {
111-
return;
112-
}
113-
while (!tasks.isEmpty()) {
114-
Runnable task = tasks.poll();
115-
if (task instanceof CancellableRunnable) {
116-
((CancellableRunnable) task).cancel(e);
117-
} else {
118-
logger.log(
119-
Level.WARNING,
120-
"Attempted to cancel Runnable that was not CancellableRunnable; ignored.");
121-
}
122-
}
123-
}
124-
}
125-
126104
protected void invokeCallback(final Deque<Runnable> tasks) {
127105
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
128106
Runnable task = tasks.poll();
129107
if (task != null) {
130108
task.run();
131109
}
132110
}
111+
}
112+
113+
private static class AutoExecutor extends SequentialExecutor {
114+
AutoExecutor(Executor executor) {
115+
super(executor);
116+
}
117+
118+
protected void execute(final String key, final Deque<Runnable> finalTasks) {
119+
executor.execute(
120+
new Runnable() {
121+
@Override
122+
public void run() {
123+
invokeCallbackAndExecuteNext(key, finalTasks);
124+
}
125+
});
126+
}
133127

134128
protected void invokeCallbackAndExecuteNext(final String key, final Deque<Runnable> tasks) {
135129
invokeCallback(tasks);
@@ -153,22 +147,6 @@ public void run() {
153147
}
154148
}
155149

156-
private static class AutoExecutor extends SequentialExecutor {
157-
AutoExecutor(Executor executor) {
158-
super(executor);
159-
}
160-
161-
protected void execute(final String key, final Deque<Runnable> finalTasks) {
162-
executor.execute(
163-
new Runnable() {
164-
@Override
165-
public void run() {
166-
invokeCallbackAndExecuteNext(key, finalTasks);
167-
}
168-
});
169-
}
170-
}
171-
172150
private static class CallbackExecutor extends SequentialExecutor {
173151
CallbackExecutor(Executor executor) {
174152
super(executor);
@@ -255,4 +233,26 @@ public void run() {
255233
});
256234
}
257235
}
236+
237+
/** Cancels every task in the queue assoicated with {@code key}. */
238+
void cancelQueuedTasks(final String key, Throwable e) {
239+
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
240+
// so that no more tasks are scheduled.
241+
synchronized (tasksByKey) {
242+
final Deque<Runnable> tasks = tasksByKey.get(key);
243+
if (tasks == null) {
244+
return;
245+
}
246+
while (!tasks.isEmpty()) {
247+
Runnable task = tasks.poll();
248+
if (task instanceof CancellableRunnable) {
249+
((CancellableRunnable) task).cancel(e);
250+
} else {
251+
logger.log(
252+
Level.WARNING,
253+
"Attempted to cancel Runnable that was not CancellableRunnable; ignored.");
254+
}
255+
}
256+
}
257+
}
258258
}

0 commit comments

Comments
 (0)