Skip to content

Commit 06b3d1a

Browse files
committed
---
yaml --- r: 28279 b: refs/heads/pubsub-ordering-keys c: 32f4e23 h: refs/heads/master i: 28277: 206887e 28275: a4f5829 28271: fe6dd7d
1 parent 0efea1b commit 06b3d1a

7 files changed

Lines changed: 872 additions & 33 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: cda0b03316ae6213d51f045c940d8a634dbcbcf2
158+
refs/heads/pubsub-ordering-keys: 32f4e23ae94c93b3680550a0b7febd9415d361e6
159159
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
160160
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032
161161
refs/tags/v0.77.0: 28a85a77883ccf5d48f297fd0ef3b3dca6ce01f0

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class MessageDispatcher {
6262
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
6363

6464
private final Executor executor;
65+
private final SequentialExecutorService.AutoExecutor sequentialExecutor;
6566
private final ScheduledExecutorService systemExecutor;
6667
private final ApiClock clock;
6768

@@ -205,6 +206,7 @@ void sendAckOperations(
205206
jobLock = new ReentrantLock();
206207
messagesWaiter = new MessageWaiter();
207208
this.clock = clock;
209+
this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
208210
}
209211

210212
void start() {
@@ -349,7 +351,7 @@ public void nack() {
349351
}
350352
};
351353
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
352-
executor.execute(
354+
Runnable deliverMessageTask =
353355
new Runnable() {
354356
@Override
355357
public void run() {
@@ -370,7 +372,12 @@ public void run() {
370372
response.setException(e);
371373
}
372374
}
373-
});
375+
};
376+
if (message.getOrderingKey().isEmpty()) {
377+
executor.execute(deliverMessageTask);
378+
} else {
379+
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
380+
}
374381
}
375382

376383
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 127 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,14 @@
4949
import com.google.pubsub.v1.TopicNames;
5050
import java.io.IOException;
5151
import java.util.ArrayList;
52+
import java.util.EnumSet;
53+
import java.util.HashMap;
5254
import java.util.Iterator;
5355
import java.util.LinkedList;
5456
import java.util.List;
57+
import java.util.Map;
58+
import java.util.Set;
59+
import java.util.concurrent.Callable;
5560
import java.util.concurrent.ScheduledExecutorService;
5661
import java.util.concurrent.ScheduledFuture;
5762
import java.util.concurrent.TimeUnit;
@@ -85,15 +90,17 @@ public class Publisher {
8590
private final String topicName;
8691

8792
private final BatchingSettings batchingSettings;
93+
private final boolean enableMessageOrdering;
8894

8995
private final Lock messagesBatchLock;
90-
private MessagesBatch messagesBatch;
96+
final Map<String, MessagesBatch> messagesBatches;
9197

9298
private final AtomicBoolean activeAlarm;
9399

94100
private final PublisherStub publisherStub;
95101

96102
private final ScheduledExecutorService executor;
103+
final SequentialExecutorService.CallbackExecutor sequentialExecutor;
97104
private final AtomicBoolean shutdown;
98105
private final BackgroundResource backgroundResources;
99106
private final MessageWaiter messagesWaiter;
@@ -114,22 +121,46 @@ private Publisher(Builder builder) throws IOException {
114121
topicName = builder.topicName;
115122

116123
this.batchingSettings = builder.batchingSettings;
124+
this.enableMessageOrdering = builder.enableMessageOrdering;
117125
this.messageTransform = builder.messageTransform;
118126

119-
messagesBatch = new MessagesBatch(batchingSettings);
127+
messagesBatches = new HashMap<>();
120128
messagesBatchLock = new ReentrantLock();
121129
activeAlarm = new AtomicBoolean(false);
122130
executor = builder.executorProvider.getExecutor();
131+
sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor);
123132
List<BackgroundResource> backgroundResourceList = new ArrayList<>();
124133
if (builder.executorProvider.shouldAutoClose()) {
125134
backgroundResourceList.add(new ExecutorAsBackgroundResource(executor));
126135
}
127136

128137
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
129138
// We post-process this here to keep backward-compatibility.
130-
RetrySettings retrySettings = builder.retrySettings;
131-
if (retrySettings.getMaxAttempts() == 0) {
132-
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build();
139+
// Also, if "message ordering" is enabled, the publisher should retry sending the failed
140+
// message infinitely rather than sending the next one.
141+
RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder();
142+
if (retrySettingsBuilder.getMaxAttempts() == 0) {
143+
retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE);
144+
}
145+
if (enableMessageOrdering) {
146+
retrySettingsBuilder
147+
.setMaxAttempts(Integer.MAX_VALUE)
148+
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
149+
}
150+
151+
Set<StatusCode.Code> retryCodes;
152+
if (enableMessageOrdering) {
153+
retryCodes = EnumSet.allOf(StatusCode.Code.class);
154+
} else {
155+
retryCodes =
156+
EnumSet.of(
157+
StatusCode.Code.ABORTED,
158+
StatusCode.Code.CANCELLED,
159+
StatusCode.Code.DEADLINE_EXCEEDED,
160+
StatusCode.Code.INTERNAL,
161+
StatusCode.Code.RESOURCE_EXHAUSTED,
162+
StatusCode.Code.UNKNOWN,
163+
StatusCode.Code.UNAVAILABLE);
133164
}
134165

135166
PublisherStubSettings.Builder stubSettings =
@@ -139,15 +170,8 @@ private Publisher(Builder builder) throws IOException {
139170
.setTransportChannelProvider(builder.channelProvider);
140171
stubSettings
141172
.publishSettings()
142-
.setRetryableCodes(
143-
StatusCode.Code.ABORTED,
144-
StatusCode.Code.CANCELLED,
145-
StatusCode.Code.DEADLINE_EXCEEDED,
146-
StatusCode.Code.INTERNAL,
147-
StatusCode.Code.RESOURCE_EXHAUSTED,
148-
StatusCode.Code.UNKNOWN,
149-
StatusCode.Code.UNAVAILABLE)
150-
.setRetrySettings(retrySettings)
173+
.setRetryableCodes(retryCodes)
174+
.setRetrySettings(retrySettingsBuilder.build())
151175
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
152176
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
153177
backgroundResourceList.add(publisherStub);
@@ -194,13 +218,27 @@ public String getTopicNameString() {
194218
public ApiFuture<String> publish(PubsubMessage message) {
195219
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
196220

221+
final String orderingKey = message.getOrderingKey();
222+
Preconditions.checkState(
223+
orderingKey.isEmpty() || enableMessageOrdering,
224+
"Cannot publish a message with an ordering key when message ordering is not enabled.");
225+
197226
final OutstandingPublish outstandingPublish =
198227
new OutstandingPublish(messageTransform.apply(message));
199228
List<OutstandingBatch> batchesToSend;
200229
messagesBatchLock.lock();
201230
try {
231+
// Check if the next message makes the current batch exceed the max batch byte size.
232+
MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
233+
if (messagesBatch == null) {
234+
messagesBatch = new MessagesBatch(batchingSettings, orderingKey);
235+
messagesBatches.put(orderingKey, messagesBatch);
236+
}
237+
202238
batchesToSend = messagesBatch.add(outstandingPublish);
203-
// Setup the next duration based delivery alarm if there are messages batched.
239+
if (!batchesToSend.isEmpty() && messagesBatch.isEmpty()) {
240+
messagesBatches.remove(orderingKey);
241+
}
204242
setupAlarm();
205243
} finally {
206244
messagesBatchLock.unlock();
@@ -209,6 +247,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
209247
messagesWaiter.incrementPendingMessages(1);
210248

211249
if (!batchesToSend.isEmpty()) {
250+
publishAllWithoutInflight();
212251
for (final OutstandingBatch batch : batchesToSend) {
213252
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
214253
executor.execute(
@@ -225,7 +264,7 @@ public void run() {
225264
}
226265

227266
private void setupAlarm() {
228-
if (!messagesBatch.isEmpty()) {
267+
if (!messagesBatches.isEmpty()) {
229268
if (!activeAlarm.getAndSet(true)) {
230269
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
231270
logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs);
@@ -236,7 +275,7 @@ private void setupAlarm() {
236275
public void run() {
237276
logger.log(Level.FINER, "Sending messages based on schedule.");
238277
activeAlarm.getAndSet(false);
239-
publishAllOutstanding();
278+
publishAllWithoutInflight();
240279
}
241280
},
242281
delayThresholdMs,
@@ -257,16 +296,49 @@ public void run() {
257296
*/
258297
public void publishAllOutstanding() {
259298
messagesBatchLock.lock();
260-
OutstandingBatch batchToSend;
261299
try {
262-
if (messagesBatch.isEmpty()) {
263-
return;
300+
for (MessagesBatch batch : messagesBatches.values()) {
301+
if (!batch.isEmpty()) {
302+
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
303+
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
304+
// while this function is running. This locking mechanism needs to be improved if it
305+
// causes any performance degradation.
306+
publishOutstandingBatch(batch.popOutstandingBatch());
307+
}
308+
}
309+
messagesBatches.clear();
310+
} finally {
311+
messagesBatchLock.unlock();
312+
}
313+
}
314+
315+
/**
316+
* Publish any outstanding batches if non-empty and there are no other batches in flight. This
317+
* method sends buffered messages, but does not wait for the send operations to complete. To wait
318+
* for messages to send, call {@code get} on the futures returned from {@code publish}.
319+
*/
320+
private void publishAllWithoutInflight() {
321+
messagesBatchLock.lock();
322+
try {
323+
Iterator<Map.Entry<String, MessagesBatch>> it = messagesBatches.entrySet().iterator();
324+
while (it.hasNext()) {
325+
Map.Entry<String, MessagesBatch> entry = it.next();
326+
MessagesBatch batch = entry.getValue();
327+
String key = entry.getKey();
328+
if (batch.isEmpty()) {
329+
it.remove();
330+
} else if (key.isEmpty() || !sequentialExecutor.hasTasksInflight(key)) {
331+
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
332+
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
333+
// while this function is running. This locking mechanism needs to be improved if it
334+
// causes any performance degradation.
335+
publishOutstandingBatch(batch.popOutstandingBatch());
336+
it.remove();
337+
}
264338
}
265-
batchToSend = messagesBatch.popOutstandingBatch();
266339
} finally {
267340
messagesBatchLock.unlock();
268341
}
269-
publishOutstandingBatch(batchToSend);
270342
}
271343

272344
private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
@@ -280,12 +352,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
280352
}
281353

282354
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
283-
ApiFutureCallback<PublishResponse> futureCallback =
355+
final ApiFutureCallback<PublishResponse> futureCallback =
284356
new ApiFutureCallback<PublishResponse>() {
285357
@Override
286358
public void onSuccess(PublishResponse result) {
287359
try {
288-
if (result.getMessageIdsCount() != outstandingBatch.size()) {
360+
if (result == null || result.getMessageIdsCount() != outstandingBatch.size()) {
289361
outstandingBatch.onFailure(
290362
new IllegalStateException(
291363
String.format(
@@ -311,20 +383,36 @@ public void onFailure(Throwable t) {
311383
}
312384
};
313385

314-
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
386+
if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) {
387+
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
388+
} else {
389+
// If ordering key is specified, publish the batch using the sequential executor.
390+
ApiFuture<PublishResponse> future =
391+
sequentialExecutor.submit(
392+
outstandingBatch.orderingKey,
393+
new Callable<ApiFuture<PublishResponse>>() {
394+
public ApiFuture<PublishResponse> call() {
395+
return publishCall(outstandingBatch);
396+
}
397+
});
398+
ApiFutures.addCallback(future, futureCallback, directExecutor());
399+
}
315400
}
316401

317402
private static final class OutstandingBatch {
318403
final List<OutstandingPublish> outstandingPublishes;
319404
final long creationTime;
320405
int attempt;
321406
int batchSizeBytes;
407+
final String orderingKey;
322408

323-
OutstandingBatch(List<OutstandingPublish> outstandingPublishes, int batchSizeBytes) {
409+
OutstandingBatch(
410+
List<OutstandingPublish> outstandingPublishes, int batchSizeBytes, String orderingKey) {
324411
this.outstandingPublishes = outstandingPublishes;
325412
attempt = 1;
326413
creationTime = System.currentTimeMillis();
327414
this.batchSizeBytes = batchSizeBytes;
415+
this.orderingKey = orderingKey;
328416
}
329417

330418
int size() {
@@ -468,7 +556,7 @@ public static final class Builder {
468556
.setRpcTimeoutMultiplier(2)
469557
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
470558
.build();
471-
559+
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
472560
private static final int THREADS_PER_CPU = 5;
473561
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
474562
InstantiatingExecutorProvider.newBuilder()
@@ -482,6 +570,8 @@ public static final class Builder {
482570

483571
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
484572

573+
private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING;
574+
485575
private TransportChannelProvider channelProvider =
486576
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
487577

@@ -576,6 +666,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
576666
return this;
577667
}
578668

669+
/** Sets the message ordering option. */
670+
public Builder setEnableMessageOrdering(boolean enableMessageOrdering) {
671+
this.enableMessageOrdering = enableMessageOrdering;
672+
return this;
673+
}
674+
579675
/** Gives the ability to set a custom executor to be used by the library. */
580676
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
581677
this.executorProvider = Preconditions.checkNotNull(executorProvider);
@@ -601,15 +697,17 @@ public Publisher build() throws IOException {
601697
private static class MessagesBatch {
602698
private List<OutstandingPublish> messages;
603699
private int batchedBytes;
700+
private String orderingKey;
604701
private final BatchingSettings batchingSettings;
605702

606-
public MessagesBatch(BatchingSettings batchingSettings) {
703+
private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) {
607704
this.batchingSettings = batchingSettings;
705+
this.orderingKey = orderingKey;
608706
reset();
609707
}
610708

611709
private OutstandingBatch popOutstandingBatch() {
612-
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
710+
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey);
613711
reset();
614712
return batch;
615713
}

0 commit comments

Comments
 (0)