Skip to content

Commit 516ec9b

Browse files
committed
---
yaml --- r: 27259 b: refs/heads/pubsub-ordering-keys c: 75acad9 h: refs/heads/master i: 27257: ad1da67 27255: 223267a
1 parent ea1b80d commit 516ec9b

3 files changed

Lines changed: 77 additions & 64 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: 956537c5ae22171b198568b729a32e42059db9ae
158+
refs/heads/pubsub-ordering-keys: 75acad92e69422d9ed62b6ea738905538ffcbd32
159159
"refs/heads/update_mvn_badge": ae2d773814db0f71197ccf5a8612ee1d8056f8de
160160
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
161161
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -390,13 +390,15 @@ public void onFailure(Throwable t) {
390390
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
391391
} else {
392392
// If ordering key is specified, publish the batch using the sequential executor.
393-
sequentialExecutor.submit(outstandingBatch.orderingKey, new Callable<ApiFuture<PublishResponse>>() {
394-
public ApiFuture<PublishResponse> call() {
395-
ApiFuture<PublishResponse> future = publishCall(outstandingBatch);
396-
ApiFutures.addCallback(future, futureCallback, directExecutor());
397-
return future;
398-
}
399-
});
393+
sequentialExecutor.submit(
394+
outstandingBatch.orderingKey,
395+
new Callable<ApiFuture<PublishResponse>>() {
396+
public ApiFuture<PublishResponse> call() {
397+
ApiFuture<PublishResponse> future = publishCall(outstandingBatch);
398+
ApiFutures.addCallback(future, futureCallback, directExecutor());
399+
return future;
400+
}
401+
});
400402
}
401403
}
402404

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java

Lines changed: 67 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Queue;
3030
import java.util.concurrent.Callable;
3131
import java.util.concurrent.CancellationException;
32-
import java.util.concurrent.ConcurrentLinkedQueue;
3332
import java.util.concurrent.Executor;
3433

3534
interface CancellableRunnable extends Runnable {
@@ -89,20 +88,22 @@ protected void callNextTaskAsync(final String key) {
8988
}
9089
}
9190
if (executeTask) {
92-
executor.execute(new Runnable() {
93-
@Override public void run() {
94-
R task = null;
95-
synchronized (tasksByKey) {
96-
Queue<R> tasks = tasksByKey.get(key);
97-
if (tasks != null && !tasks.isEmpty()) {
98-
task = tasks.poll();
91+
executor.execute(
92+
new Runnable() {
93+
@Override
94+
public void run() {
95+
R task = null;
96+
synchronized (tasksByKey) {
97+
Queue<R> tasks = tasksByKey.get(key);
98+
if (tasks != null && !tasks.isEmpty()) {
99+
task = tasks.poll();
100+
}
101+
}
102+
if (task != null) {
103+
task.run();
104+
}
99105
}
100-
}
101-
if (task != null) {
102-
task.run();
103-
}
104-
}
105-
});
106+
});
106107
}
107108
}
108109
}
@@ -115,15 +116,18 @@ static class AutoExecutor extends SequentialExecutor<Runnable> {
115116

116117
/** Runs synchronous {@code Runnable} tasks sequentially. */
117118
void submit(final String key, final Runnable task) {
118-
super.execute(key, new Runnable() {
119-
@Override public void run() {
120-
try {
121-
task.run();
122-
} finally {
123-
callNextTaskAsync(key);
124-
}
125-
}
126-
});
119+
super.execute(
120+
key,
121+
new Runnable() {
122+
@Override
123+
public void run() {
124+
try {
125+
task.run();
126+
} finally {
127+
callNextTaskAsync(key);
128+
}
129+
}
130+
});
127131
}
128132
}
129133

@@ -133,8 +137,9 @@ void submit(final String key, final Runnable task) {
133137
*/
134138
@BetaApi
135139
static class CallbackExecutor extends SequentialExecutor<CancellableRunnable> {
136-
static CancellationException CANCELLATION_EXCEPTION = new CancellationException(
137-
"Execution cancelled because executing previous runnable failed.");
140+
static CancellationException CANCELLATION_EXCEPTION =
141+
new CancellationException(
142+
"Execution cancelled because executing previous runnable failed.");
138143

139144
CallbackExecutor(Executor executor) {
140145
super(executor);
@@ -177,41 +182,47 @@ <T> ApiFuture<T> submit(final String key, final Callable<ApiFuture<T>> callable)
177182

178183
// Step 2: create the CancellableRunnable
179184
// Step 3: add the task to queue via `execute`
180-
CancellableRunnable task = new CancellableRunnable() {
181-
private boolean cancelled = false;
182-
183-
@Override public void run() {
184-
// the task was cancelled
185-
if (cancelled) {
186-
return;
187-
}
188-
189-
try {
190-
// Step 4: call the `Callable`
191-
ApiFutureCallback<T> callback = new ApiFutureCallback<T>() {
192-
// Step 5.1: on success
193-
@Override public void onSuccess(T msg) {
194-
future.set(msg);
195-
callNextTaskAsync(key);
185+
CancellableRunnable task =
186+
new CancellableRunnable() {
187+
private boolean cancelled = false;
188+
189+
@Override
190+
public void run() {
191+
// the task was cancelled
192+
if (cancelled) {
193+
return;
196194
}
197195

198-
// Step 5.2: on failure
199-
@Override public void onFailure(Throwable e) {
200-
future.setException(e);
201-
cancelQueuedTasks(key, CANCELLATION_EXCEPTION);
196+
try {
197+
// Step 4: call the `Callable`
198+
ApiFutureCallback<T> callback =
199+
new ApiFutureCallback<T>() {
200+
// Step 5.1: on success
201+
@Override
202+
public void onSuccess(T msg) {
203+
future.set(msg);
204+
callNextTaskAsync(key);
205+
}
206+
207+
// Step 5.2: on failure
208+
@Override
209+
public void onFailure(Throwable e) {
210+
future.setException(e);
211+
cancelQueuedTasks(key, CANCELLATION_EXCEPTION);
212+
}
213+
};
214+
ApiFutures.addCallback(callable.call(), callback, directExecutor());
215+
} catch (Exception e) {
216+
cancel(e);
202217
}
203-
};
204-
ApiFutures.addCallback(callable.call(), callback, directExecutor());
205-
} catch (Exception e) {
206-
cancel(e);
207-
}
208-
}
218+
}
209219

210-
@Override public void cancel(Throwable e) {
211-
this.cancelled = true;
212-
future.setException(e);
213-
}
214-
};
220+
@Override
221+
public void cancel(Throwable e) {
222+
this.cancelled = true;
223+
future.setException(e);
224+
}
225+
};
215226
execute(key, task);
216227
return future;
217228
}

0 commit comments

Comments
 (0)