Skip to content

Commit cf96e2a

Browse files
committed
---
yaml --- r: 35471 b: refs/heads/pubsub-ordering-keys c: 3ea3dc9 h: refs/heads/master i: 35469: 80d72e1 35467: 8b8308f 35463: 33bc576 35455: f201fec
1 parent 16848e0 commit cf96e2a

7 files changed

Lines changed: 978 additions & 39 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: cf6d5282c1e7d82b42d74bd7a88590ba5952dc3b
158+
refs/heads/pubsub-ordering-keys: 3ea3dc93288c90e1776339a2c059a076e2e2fd1d
159159
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
160160
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032
161161
refs/tags/v0.77.0: 28a85a77883ccf5d48f297fd0ef3b3dca6ce01f0

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class MessageDispatcher {
6262
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
6363

6464
private final Executor executor;
65+
private final SequentialExecutorService.AutoExecutor sequentialExecutor;
6566
private final ScheduledExecutorService systemExecutor;
6667
private final ApiClock clock;
6768

@@ -205,6 +206,7 @@ void sendAckOperations(
205206
jobLock = new ReentrantLock();
206207
messagesWaiter = new MessageWaiter();
207208
this.clock = clock;
209+
this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
208210
}
209211

210212
void start() {
@@ -349,7 +351,7 @@ public void nack() {
349351
}
350352
};
351353
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
352-
executor.execute(
354+
Runnable deliverMessageTask =
353355
new Runnable() {
354356
@Override
355357
public void run() {
@@ -370,7 +372,12 @@ public void run() {
370372
response.setException(e);
371373
}
372374
}
373-
});
375+
};
376+
if (message.getOrderingKey().isEmpty()) {
377+
executor.execute(deliverMessageTask);
378+
} else {
379+
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
380+
}
374381
}
375382

376383
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 133 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@
4949
import com.google.pubsub.v1.TopicNames;
5050
import java.io.IOException;
5151
import java.util.ArrayList;
52+
import java.util.HashMap;
5253
import java.util.Iterator;
5354
import java.util.LinkedList;
5455
import java.util.List;
56+
import java.util.Map;
57+
import java.util.concurrent.Callable;
5558
import java.util.concurrent.ScheduledExecutorService;
5659
import java.util.concurrent.ScheduledFuture;
5760
import java.util.concurrent.TimeUnit;
@@ -85,15 +88,17 @@ public class Publisher {
8588
private final String topicName;
8689

8790
private final BatchingSettings batchingSettings;
91+
private final boolean enableMessageOrdering;
8892

8993
private final Lock messagesBatchLock;
90-
private MessagesBatch messagesBatch;
94+
final Map<String, MessagesBatch> messagesBatches;
9195

9296
private final AtomicBoolean activeAlarm;
9397

9498
private final PublisherStub publisherStub;
9599

96100
private final ScheduledExecutorService executor;
101+
final SequentialExecutorService.CallbackExecutor sequentialExecutor;
97102
private final AtomicBoolean shutdown;
98103
private final BackgroundResource backgroundResources;
99104
private final MessageWaiter messagesWaiter;
@@ -114,22 +119,33 @@ private Publisher(Builder builder) throws IOException {
114119
topicName = builder.topicName;
115120

116121
this.batchingSettings = builder.batchingSettings;
122+
this.enableMessageOrdering = builder.enableMessageOrdering;
117123
this.messageTransform = builder.messageTransform;
118124

119-
messagesBatch = new MessagesBatch(batchingSettings);
125+
messagesBatches = new HashMap<>();
120126
messagesBatchLock = new ReentrantLock();
121127
activeAlarm = new AtomicBoolean(false);
122128
executor = builder.executorProvider.getExecutor();
129+
sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor);
123130
List<BackgroundResource> backgroundResourceList = new ArrayList<>();
124131
if (builder.executorProvider.shouldAutoClose()) {
125132
backgroundResourceList.add(new ExecutorAsBackgroundResource(executor));
126133
}
127134

128135
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
129136
// We post-process this here to keep backward-compatibility.
130-
RetrySettings retrySettings = builder.retrySettings;
131-
if (retrySettings.getMaxAttempts() == 0) {
132-
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build();
137+
// Also, if "message ordering" is enabled, the publisher should retry sending the failed
138+
// message infinitely rather than sending the next one.
139+
RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder();
140+
if (retrySettingsBuilder.getMaxAttempts() == 0) {
141+
retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE);
142+
}
143+
if (enableMessageOrdering) {
144+
// TODO: is there a way to have the default retry settings for requests without an ordering
145+
// key?
146+
retrySettingsBuilder
147+
.setMaxAttempts(Integer.MAX_VALUE)
148+
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
133149
}
134150

135151
PublisherStubSettings.Builder stubSettings =
@@ -147,7 +163,7 @@ private Publisher(Builder builder) throws IOException {
147163
StatusCode.Code.RESOURCE_EXHAUSTED,
148164
StatusCode.Code.UNKNOWN,
149165
StatusCode.Code.UNAVAILABLE)
150-
.setRetrySettings(retrySettings)
166+
.setRetrySettings(retrySettingsBuilder.build())
151167
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
152168
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
153169
backgroundResourceList.add(publisherStub);
@@ -194,38 +210,61 @@ public String getTopicNameString() {
194210
public ApiFuture<String> publish(PubsubMessage message) {
195211
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
196212

213+
final String orderingKey = message.getOrderingKey();
214+
Preconditions.checkState(
215+
orderingKey.isEmpty() || enableMessageOrdering,
216+
"Cannot publish a message with an ordering key when message ordering is not enabled.");
217+
197218
final OutstandingPublish outstandingPublish =
198219
new OutstandingPublish(messageTransform.apply(message));
199220
List<OutstandingBatch> batchesToSend;
200221
messagesBatchLock.lock();
201222
try {
223+
MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
224+
if (messagesBatch == null) {
225+
messagesBatch = new MessagesBatch(batchingSettings, orderingKey);
226+
messagesBatches.put(orderingKey, messagesBatch);
227+
}
228+
202229
batchesToSend = messagesBatch.add(outstandingPublish);
203-
// Setup the next duration based delivery alarm if there are messages batched.
230+
if (!batchesToSend.isEmpty() && messagesBatch.isEmpty()) {
231+
messagesBatches.remove(orderingKey);
232+
}
204233
setupAlarm();
234+
if (!batchesToSend.isEmpty()) {
235+
// TODO: if this is not an ordering keys scenario, will this do anything?
236+
publishAllWithoutInflight();
237+
238+
// TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock?
239+
for (final OutstandingBatch batch : batchesToSend) {
240+
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
241+
publishOutstandingBatch(batch);
242+
}
243+
}
205244
} finally {
206245
messagesBatchLock.unlock();
207246
}
208247

209248
messagesWaiter.incrementPendingMessages(1);
210-
211-
if (!batchesToSend.isEmpty()) {
212-
for (final OutstandingBatch batch : batchesToSend) {
213-
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
214-
executor.execute(
215-
new Runnable() {
216-
@Override
217-
public void run() {
218-
publishOutstandingBatch(batch);
219-
}
220-
});
221-
}
222-
}
223-
224249
return outstandingPublish.publishResult;
225250
}
226251

252+
/**
253+
* There may be non-recoverable problems with a request for an ordering key. In that case, all
254+
* subsequent requests will fail until this method is called. If the key is not currently paused,
255+
* calling this method will be a no-op.
256+
*
257+
* @param key The key for which to resume publishing.
258+
*/
259+
// TODO: make this public when Ordering keys is live
260+
@BetaApi
261+
void resumePublish(String key) {
262+
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
263+
sequentialExecutor.resumePublish(key);
264+
}
265+
227266
private void setupAlarm() {
228-
if (!messagesBatch.isEmpty()) {
267+
if (!messagesBatches.isEmpty()) {
229268
if (!activeAlarm.getAndSet(true)) {
230269
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
231270
logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs);
@@ -236,7 +275,7 @@ private void setupAlarm() {
236275
public void run() {
237276
logger.log(Level.FINER, "Sending messages based on schedule.");
238277
activeAlarm.getAndSet(false);
239-
publishAllOutstanding();
278+
publishAllWithoutInflight();
240279
}
241280
},
242281
delayThresholdMs,
@@ -257,16 +296,46 @@ public void run() {
257296
*/
258297
public void publishAllOutstanding() {
259298
messagesBatchLock.lock();
260-
OutstandingBatch batchToSend;
261299
try {
262-
if (messagesBatch.isEmpty()) {
263-
return;
300+
for (MessagesBatch batch : messagesBatches.values()) {
301+
if (!batch.isEmpty()) {
302+
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
303+
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
304+
// while this function is running. This locking mechanism needs to be improved if it
305+
// causes any performance degradation.
306+
publishOutstandingBatch(batch.popOutstandingBatch());
307+
}
308+
}
309+
messagesBatches.clear();
310+
} finally {
311+
messagesBatchLock.unlock();
312+
}
313+
}
314+
315+
/**
316+
* Publish any outstanding batches if non-empty and there are no other batches in flight. This
317+
* method sends buffered messages, but does not wait for the send operations to complete. To wait
318+
* for messages to send, call {@code get} on the futures returned from {@code publish}.
319+
*/
320+
private void publishAllWithoutInflight() {
321+
messagesBatchLock.lock();
322+
try {
323+
Iterator<Map.Entry<String, MessagesBatch>> it = messagesBatches.entrySet().iterator();
324+
while (it.hasNext()) {
325+
Map.Entry<String, MessagesBatch> entry = it.next();
326+
MessagesBatch batch = entry.getValue();
327+
String key = entry.getKey();
328+
if (batch.isEmpty()) {
329+
it.remove();
330+
} else if (key.isEmpty() || !sequentialExecutor.hasTasksInflight(key)) {
331+
// TODO: Will this cause a performance problem for non-ordering keys scenarios?
332+
publishOutstandingBatch(batch.popOutstandingBatch());
333+
it.remove();
334+
}
264335
}
265-
batchToSend = messagesBatch.popOutstandingBatch();
266336
} finally {
267337
messagesBatchLock.unlock();
268338
}
269-
publishOutstandingBatch(batchToSend);
270339
}
271340

272341
private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
@@ -280,12 +349,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
280349
}
281350

282351
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
283-
ApiFutureCallback<PublishResponse> futureCallback =
352+
final ApiFutureCallback<PublishResponse> futureCallback =
284353
new ApiFutureCallback<PublishResponse>() {
285354
@Override
286355
public void onSuccess(PublishResponse result) {
287356
try {
288-
if (result.getMessageIdsCount() != outstandingBatch.size()) {
357+
if (result == null || result.getMessageIdsCount() != outstandingBatch.size()) {
289358
outstandingBatch.onFailure(
290359
new IllegalStateException(
291360
String.format(
@@ -311,20 +380,37 @@ public void onFailure(Throwable t) {
311380
}
312381
};
313382

314-
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
383+
ApiFuture<PublishResponse> future;
384+
if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) {
385+
future = publishCall(outstandingBatch);
386+
} else {
387+
// If ordering key is specified, publish the batch using the sequential executor.
388+
future =
389+
sequentialExecutor.submit(
390+
outstandingBatch.orderingKey,
391+
new Callable<ApiFuture<PublishResponse>>() {
392+
public ApiFuture<PublishResponse> call() {
393+
return publishCall(outstandingBatch);
394+
}
395+
});
396+
}
397+
ApiFutures.addCallback(future, futureCallback, directExecutor());
315398
}
316399

317400
private static final class OutstandingBatch {
318401
final List<OutstandingPublish> outstandingPublishes;
319402
final long creationTime;
320403
int attempt;
321404
int batchSizeBytes;
405+
final String orderingKey;
322406

323-
OutstandingBatch(List<OutstandingPublish> outstandingPublishes, int batchSizeBytes) {
407+
OutstandingBatch(
408+
List<OutstandingPublish> outstandingPublishes, int batchSizeBytes, String orderingKey) {
324409
this.outstandingPublishes = outstandingPublishes;
325410
attempt = 1;
326411
creationTime = System.currentTimeMillis();
327412
this.batchSizeBytes = batchSizeBytes;
413+
this.orderingKey = orderingKey;
328414
}
329415

330416
int size() {
@@ -468,7 +554,7 @@ public static final class Builder {
468554
.setRpcTimeoutMultiplier(2)
469555
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
470556
.build();
471-
557+
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
472558
private static final int THREADS_PER_CPU = 5;
473559
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
474560
InstantiatingExecutorProvider.newBuilder()
@@ -482,6 +568,8 @@ public static final class Builder {
482568

483569
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
484570

571+
private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING;
572+
485573
private TransportChannelProvider channelProvider =
486574
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
487575

@@ -576,6 +664,14 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
576664
return this;
577665
}
578666

667+
/** Sets the message ordering option. */
668+
// TODO: make this public when Ordering keys is live
669+
@BetaApi
670+
Builder setEnableMessageOrdering(boolean enableMessageOrdering) {
671+
this.enableMessageOrdering = enableMessageOrdering;
672+
return this;
673+
}
674+
579675
/** Gives the ability to set a custom executor to be used by the library. */
580676
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
581677
this.executorProvider = Preconditions.checkNotNull(executorProvider);
@@ -601,15 +697,17 @@ public Publisher build() throws IOException {
601697
private static class MessagesBatch {
602698
private List<OutstandingPublish> messages;
603699
private int batchedBytes;
700+
private String orderingKey;
604701
private final BatchingSettings batchingSettings;
605702

606-
public MessagesBatch(BatchingSettings batchingSettings) {
703+
private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) {
607704
this.batchingSettings = batchingSettings;
705+
this.orderingKey = orderingKey;
608706
reset();
609707
}
610708

611709
private OutstandingBatch popOutstandingBatch() {
612-
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
710+
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey);
613711
reset();
614712
return batch;
615713
}

0 commit comments

Comments
 (0)