Skip to content

Commit 4ddc57a

Browse files
committed
---
yaml --- r: 28265 b: refs/heads/pubsub-ordering-keys c: dd8db2e h: refs/heads/master i: 28263: b91119f
1 parent 47601e2 commit 4ddc57a

7 files changed

Lines changed: 799 additions & 31 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: 61b9e4fc725fda428fa6ec8ffe177cd86bf230a9
158+
refs/heads/pubsub-ordering-keys: dd8db2ec9e7787e082eda670060be378c738b228
159159
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
160160
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032
161161
refs/tags/v0.77.0: 28a85a77883ccf5d48f297fd0ef3b3dca6ce01f0

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class MessageDispatcher {
6262
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
6363

6464
private final Executor executor;
65+
private final SequentialExecutorService.AutoExecutor sequentialExecutor;
6566
private final ScheduledExecutorService systemExecutor;
6667
private final ApiClock clock;
6768

@@ -205,6 +206,7 @@ void sendAckOperations(
205206
jobLock = new ReentrantLock();
206207
messagesWaiter = new MessageWaiter();
207208
this.clock = clock;
209+
this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
208210
}
209211

210212
void start() {
@@ -349,7 +351,7 @@ public void nack() {
349351
}
350352
};
351353
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
352-
executor.execute(
354+
Runnable deliverMessageTask =
353355
new Runnable() {
354356
@Override
355357
public void run() {
@@ -370,7 +372,12 @@ public void run() {
370372
response.setException(e);
371373
}
372374
}
373-
});
375+
};
376+
if (message.getOrderingKey().isEmpty()) {
377+
executor.execute(deliverMessageTask);
378+
} else {
379+
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
380+
}
374381
}
375382

376383
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 95 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,15 @@
4949
import com.google.pubsub.v1.TopicNames;
5050
import java.io.IOException;
5151
import java.util.ArrayList;
52+
import java.util.Collections;
53+
import java.util.EnumSet;
54+
import java.util.HashMap;
5255
import java.util.Iterator;
5356
import java.util.LinkedList;
5457
import java.util.List;
58+
import java.util.Map;
59+
import java.util.Set;
60+
import java.util.concurrent.Callable;
5561
import java.util.concurrent.ScheduledExecutorService;
5662
import java.util.concurrent.ScheduledFuture;
5763
import java.util.concurrent.TimeUnit;
@@ -85,15 +91,17 @@ public class Publisher {
8591
private final String topicName;
8692

8793
private final BatchingSettings batchingSettings;
94+
private final boolean enableMessageOrdering;
8895

8996
private final Lock messagesBatchLock;
90-
private MessagesBatch messagesBatch;
97+
private final Map<String, MessagesBatch> messagesBatches;
9198

9299
private final AtomicBoolean activeAlarm;
93100

94101
private final PublisherStub publisherStub;
95102

96103
private final ScheduledExecutorService executor;
104+
private final SequentialExecutorService.CallbackExecutor sequentialExecutor;
97105
private final AtomicBoolean shutdown;
98106
private final BackgroundResource backgroundResources;
99107
private final MessageWaiter messagesWaiter;
@@ -114,22 +122,46 @@ private Publisher(Builder builder) throws IOException {
114122
topicName = builder.topicName;
115123

116124
this.batchingSettings = builder.batchingSettings;
125+
this.enableMessageOrdering = builder.enableMessageOrdering;
117126
this.messageTransform = builder.messageTransform;
118127

119-
messagesBatch = new MessagesBatch(batchingSettings);
128+
messagesBatches = new HashMap<>();
120129
messagesBatchLock = new ReentrantLock();
121130
activeAlarm = new AtomicBoolean(false);
122131
executor = builder.executorProvider.getExecutor();
132+
sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor);
123133
List<BackgroundResource> backgroundResourceList = new ArrayList<>();
124134
if (builder.executorProvider.shouldAutoClose()) {
125135
backgroundResourceList.add(new ExecutorAsBackgroundResource(executor));
126136
}
127137

128138
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
129139
// We post-process this here to keep backward-compatibility.
130-
RetrySettings retrySettings = builder.retrySettings;
131-
if (retrySettings.getMaxAttempts() == 0) {
132-
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build();
140+
// Also, if "message ordering" is enabled, the publisher should retry sending the failed
141+
// message infinitely rather than sending the next one.
142+
RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder();
143+
if (retrySettingsBuilder.getMaxAttempts() == 0) {
144+
retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE);
145+
}
146+
if (enableMessageOrdering) {
147+
retrySettingsBuilder
148+
.setMaxAttempts(Integer.MAX_VALUE)
149+
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
150+
}
151+
152+
Set<StatusCode.Code> retryCodes;
153+
if (enableMessageOrdering) {
154+
retryCodes = EnumSet.allOf(StatusCode.Code.class);
155+
} else {
156+
retryCodes =
157+
EnumSet.of(
158+
StatusCode.Code.ABORTED,
159+
StatusCode.Code.CANCELLED,
160+
StatusCode.Code.DEADLINE_EXCEEDED,
161+
StatusCode.Code.INTERNAL,
162+
StatusCode.Code.RESOURCE_EXHAUSTED,
163+
StatusCode.Code.UNKNOWN,
164+
StatusCode.Code.UNAVAILABLE);
133165
}
134166

135167
PublisherStubSettings.Builder stubSettings =
@@ -139,15 +171,8 @@ private Publisher(Builder builder) throws IOException {
139171
.setTransportChannelProvider(builder.channelProvider);
140172
stubSettings
141173
.publishSettings()
142-
.setRetryableCodes(
143-
StatusCode.Code.ABORTED,
144-
StatusCode.Code.CANCELLED,
145-
StatusCode.Code.DEADLINE_EXCEEDED,
146-
StatusCode.Code.INTERNAL,
147-
StatusCode.Code.RESOURCE_EXHAUSTED,
148-
StatusCode.Code.UNKNOWN,
149-
StatusCode.Code.UNAVAILABLE)
150-
.setRetrySettings(retrySettings)
174+
.setRetryableCodes(retryCodes)
175+
.setRetrySettings(retrySettingsBuilder.build())
151176
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
152177
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
153178
backgroundResourceList.add(publisherStub);
@@ -196,13 +221,25 @@ public ApiFuture<String> publish(PubsubMessage message) {
196221
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
197222
}
198223

224+
final String orderingKey = message.getOrderingKey();
225+
if (orderingKey != null && !orderingKey.isEmpty() && !enableMessageOrdering) {
226+
throw new IllegalStateException(
227+
"Cannot publish a message with an ordering key when message ordering is not enabled.");
228+
}
229+
199230
final OutstandingPublish outstandingPublish =
200231
new OutstandingPublish(messageTransform.apply(message));
201232
List<OutstandingBatch> batchesToSend;
202233
messagesBatchLock.lock();
203234
try {
235+
// Check if the next message makes the current batch exceed the max batch byte size.
236+
MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
237+
if (messagesBatch == null) {
238+
messagesBatch = new MessagesBatch(batchingSettings, orderingKey);
239+
messagesBatches.put(orderingKey, messagesBatch);
240+
}
241+
204242
batchesToSend = messagesBatch.add(outstandingPublish);
205-
// Setup the next duration based delivery alarm if there are messages batched.
206243
setupAlarm();
207244
} finally {
208245
messagesBatchLock.unlock();
@@ -211,6 +248,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
211248
messagesWaiter.incrementPendingMessages(1);
212249

213250
if (!batchesToSend.isEmpty()) {
251+
publishAllOutstanding();
214252
for (final OutstandingBatch batch : batchesToSend) {
215253
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
216254
executor.execute(
@@ -227,7 +265,7 @@ public void run() {
227265
}
228266

229267
private void setupAlarm() {
230-
if (!messagesBatch.isEmpty()) {
268+
if (!messagesBatches.isEmpty()) {
231269
if (!activeAlarm.getAndSet(true)) {
232270
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
233271
logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs);
@@ -259,16 +297,20 @@ public void run() {
259297
*/
260298
public void publishAllOutstanding() {
261299
messagesBatchLock.lock();
262-
OutstandingBatch batchToSend;
263300
try {
264-
if (messagesBatch.isEmpty()) {
265-
return;
301+
for (MessagesBatch batch : messagesBatches.values()) {
302+
if (!batch.isEmpty()) {
303+
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
304+
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
305+
// while this function is running. This locking mechanism needs to be improved if it
306+
// causes any performance degradation.
307+
publishOutstandingBatch(batch.popOutstandingBatch());
308+
}
266309
}
267-
batchToSend = messagesBatch.popOutstandingBatch();
310+
messagesBatches.clear();
268311
} finally {
269312
messagesBatchLock.unlock();
270313
}
271-
publishOutstandingBatch(batchToSend);
272314
}
273315

274316
private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
@@ -282,7 +324,7 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
282324
}
283325

284326
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
285-
ApiFutureCallback<PublishResponse> futureCallback =
327+
final ApiFutureCallback<PublishResponse> futureCallback =
286328
new ApiFutureCallback<PublishResponse>() {
287329
@Override
288330
public void onSuccess(PublishResponse result) {
@@ -323,20 +365,36 @@ public void onFailure(Throwable t) {
323365
}
324366
};
325367

326-
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
368+
if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) {
369+
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
370+
} else {
371+
// If ordering key is specified, publish the batch using the sequential executor.
372+
sequentialExecutor.submit(
373+
outstandingBatch.orderingKey,
374+
new Callable<ApiFuture<PublishResponse>>() {
375+
public ApiFuture<PublishResponse> call() {
376+
ApiFuture<PublishResponse> future = publishCall(outstandingBatch);
377+
ApiFutures.addCallback(future, futureCallback, directExecutor());
378+
return future;
379+
}
380+
});
381+
}
327382
}
328383

329384
private static final class OutstandingBatch {
330385
final List<OutstandingPublish> outstandingPublishes;
331386
final long creationTime;
332387
int attempt;
333388
int batchSizeBytes;
389+
final String orderingKey;
334390

335-
OutstandingBatch(List<OutstandingPublish> outstandingPublishes, int batchSizeBytes) {
391+
OutstandingBatch(
392+
List<OutstandingPublish> outstandingPublishes, int batchSizeBytes, String orderingKey) {
336393
this.outstandingPublishes = outstandingPublishes;
337394
attempt = 1;
338395
creationTime = System.currentTimeMillis();
339396
this.batchSizeBytes = batchSizeBytes;
397+
this.orderingKey = orderingKey;
340398
}
341399

342400
int size() {
@@ -460,7 +518,7 @@ public static final class Builder {
460518
.setRpcTimeoutMultiplier(2)
461519
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
462520
.build();
463-
521+
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
464522
private static final int THREADS_PER_CPU = 5;
465523
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
466524
InstantiatingExecutorProvider.newBuilder()
@@ -474,6 +532,8 @@ public static final class Builder {
474532

475533
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
476534

535+
boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING;
536+
477537
TransportChannelProvider channelProvider =
478538
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
479539

@@ -568,6 +628,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
568628
return this;
569629
}
570630

631+
/** Sets the message ordering option. */
632+
public Builder setEnableMessageOrdering(boolean enableMessageOrdering) {
633+
this.enableMessageOrdering = enableMessageOrdering;
634+
return this;
635+
}
636+
571637
/** Gives the ability to set a custom executor to be used by the library. */
572638
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
573639
this.executorProvider = Preconditions.checkNotNull(executorProvider);
@@ -593,15 +659,17 @@ public Publisher build() throws IOException {
593659
private static class MessagesBatch {
594660
private List<OutstandingPublish> messages;
595661
private int batchedBytes;
662+
private String orderingKey;
596663
private final BatchingSettings batchingSettings;
597664

598-
public MessagesBatch(BatchingSettings batchingSettings) {
665+
private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) {
599666
this.batchingSettings = batchingSettings;
667+
this.orderingKey = orderingKey;
600668
reset();
601669
}
602670

603671
private OutstandingBatch popOutstandingBatch() {
604-
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
672+
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey);
605673
reset();
606674
return batch;
607675
}

0 commit comments

Comments
 (0)