Skip to content

Commit 1d632a1

Browse files
committed
---
yaml --- r: 26237 b: refs/heads/pubsub-ordering-keys c: cc44d4b h: refs/heads/master i: 26235: a75735c
1 parent 9944365 commit 1d632a1

7 files changed

Lines changed: 847 additions & 60 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: b1ebd2d84384315f5788d1fa9f48c0a60265de20
158+
refs/heads/pubsub-ordering-keys: cc44d4bb4fc87f65d5f6f48093b52c63a2c39c99
159159
"refs/heads/update_mvn_badge": ae2d773814db0f71197ccf5a8612ee1d8056f8de
160160
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
161161
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class MessageDispatcher {
6262
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
6363

6464
private final Executor executor;
65+
private final SequentialExecutorService sequentialExecutor;
6566
private final ScheduledExecutorService systemExecutor;
6667
private final ApiClock clock;
6768

@@ -205,6 +206,7 @@ void sendAckOperations(
205206
jobLock = new ReentrantLock();
206207
messagesWaiter = new MessageWaiter();
207208
this.clock = clock;
209+
this.sequentialExecutor = new SequentialExecutorService(executor);
208210
}
209211

210212
void start() {
@@ -349,7 +351,7 @@ public void nack() {
349351
}
350352
};
351353
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
352-
executor.execute(
354+
Runnable deliverMessageTask =
353355
new Runnable() {
354356
@Override
355357
public void run() {
@@ -370,7 +372,12 @@ public void run() {
370372
response.setException(e);
371373
}
372374
}
373-
});
375+
};
376+
if (message.getOrderingKey().isEmpty()) {
377+
executor.execute(deliverMessageTask);
378+
} else {
379+
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
380+
}
374381
}
375382

376383
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 125 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,14 @@
4848
import com.google.pubsub.v1.TopicNames;
4949
import java.io.IOException;
5050
import java.util.Collections;
51+
import java.util.EnumSet;
52+
import java.util.HashMap;
5153
import java.util.Iterator;
5254
import java.util.LinkedList;
5355
import java.util.List;
56+
import java.util.Map;
57+
import java.util.Set;
58+
import java.util.concurrent.Callable;
5459
import java.util.concurrent.ScheduledExecutorService;
5560
import java.util.concurrent.ScheduledFuture;
5661
import java.util.concurrent.TimeUnit;
@@ -84,15 +89,17 @@ public class Publisher {
8489
private final String topicName;
8590

8691
private final BatchingSettings batchingSettings;
92+
private final boolean enableMessageOrdering;
8793

8894
private final Lock messagesBatchLock;
89-
private MessagesBatch messagesBatch;
95+
private final Map<String, MessagesBatch> messagesBatches;
9096

9197
private final AtomicBoolean activeAlarm;
9298

9399
private final PublisherStub publisherStub;
94100

95101
private final ScheduledExecutorService executor;
102+
private final SequentialExecutorService<PublishResponse> sequentialExecutor;
96103
private final AtomicBoolean shutdown;
97104
private final List<AutoCloseable> closeables;
98105
private final MessageWaiter messagesWaiter;
@@ -113,12 +120,14 @@ private Publisher(Builder builder) throws IOException {
113120
topicName = builder.topicName;
114121

115122
this.batchingSettings = builder.batchingSettings;
123+
this.enableMessageOrdering = builder.enableMessageOrdering;
116124
this.messageTransform = builder.messageTransform;
117125

118-
messagesBatch = new MessagesBatch();
126+
messagesBatches = new HashMap<>();
119127
messagesBatchLock = new ReentrantLock();
120128
activeAlarm = new AtomicBoolean(false);
121129
executor = builder.executorProvider.getExecutor();
130+
sequentialExecutor = new SequentialExecutorService<>(executor);
122131
if (builder.executorProvider.shouldAutoClose()) {
123132
closeables =
124133
Collections.<AutoCloseable>singletonList(new ExecutorAsBackgroundResource(executor));
@@ -128,9 +137,31 @@ private Publisher(Builder builder) throws IOException {
128137

129138
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
130139
// We post-process this here to keep backward-compatibility.
131-
RetrySettings retrySettings = builder.retrySettings;
132-
if (retrySettings.getMaxAttempts() == 0) {
133-
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build();
140+
// Also, if "message ordering" is enabled, the publisher should retry sending the failed
141+
// message infinitely rather than sending the next one.
142+
RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder();
143+
if (retrySettingsBuilder.getMaxAttempts() == 0) {
144+
retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE);
145+
}
146+
if (enableMessageOrdering) {
147+
retrySettingsBuilder
148+
.setMaxAttempts(Integer.MAX_VALUE)
149+
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
150+
}
151+
152+
Set<StatusCode.Code> retryCodes;
153+
if (enableMessageOrdering) {
154+
retryCodes = EnumSet.allOf(StatusCode.Code.class);
155+
} else {
156+
retryCodes =
157+
EnumSet.of(
158+
StatusCode.Code.ABORTED,
159+
StatusCode.Code.CANCELLED,
160+
StatusCode.Code.DEADLINE_EXCEEDED,
161+
StatusCode.Code.INTERNAL,
162+
StatusCode.Code.RESOURCE_EXHAUSTED,
163+
StatusCode.Code.UNKNOWN,
164+
StatusCode.Code.UNAVAILABLE);
134165
}
135166

136167
PublisherStubSettings.Builder stubSettings =
@@ -140,15 +171,8 @@ private Publisher(Builder builder) throws IOException {
140171
.setTransportChannelProvider(builder.channelProvider);
141172
stubSettings
142173
.publishSettings()
143-
.setRetryableCodes(
144-
StatusCode.Code.ABORTED,
145-
StatusCode.Code.CANCELLED,
146-
StatusCode.Code.DEADLINE_EXCEEDED,
147-
StatusCode.Code.INTERNAL,
148-
StatusCode.Code.RESOURCE_EXHAUSTED,
149-
StatusCode.Code.UNKNOWN,
150-
StatusCode.Code.UNAVAILABLE)
151-
.setRetrySettings(retrySettings)
174+
.setRetryableCodes(retryCodes)
175+
.setRetrySettings(retrySettingsBuilder.build())
152176
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
153177
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
154178

@@ -196,6 +220,12 @@ public ApiFuture<String> publish(PubsubMessage message) {
196220
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
197221
}
198222

223+
final String orderingKey = message.getOrderingKey();
224+
if (orderingKey != null && !orderingKey.isEmpty() && !enableMessageOrdering) {
225+
throw new IllegalStateException(
226+
"Cannot publish a message with an ordering key when message ordering is not enabled.");
227+
}
228+
199229
message = messageTransform.apply(message);
200230
final int messageSize = message.getSerializedSize();
201231
OutstandingBatch batchToSend = null;
@@ -204,30 +234,38 @@ public ApiFuture<String> publish(PubsubMessage message) {
204234
messagesBatchLock.lock();
205235
try {
206236
// Check if the next message makes the current batch exceed the max batch byte size.
207-
if (!messagesBatch.isEmpty()
237+
MessagesBatch batch = messagesBatches.get(orderingKey);
238+
if (batch == null) {
239+
batch = new MessagesBatch(orderingKey);
240+
messagesBatches.put(orderingKey, batch);
241+
}
242+
if (!batch.isEmpty()
208243
&& hasBatchingBytes()
209-
&& messagesBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
210-
batchToSend = messagesBatch.popOutstandingBatch();
244+
&& batch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
245+
batchToSend = batch.popOutstandingBatch();
211246
}
212247

213248
// Border case if the message to send is greater or equals to the max batch size then can't
214249
// be included in the current batch and instead sent immediately.
215250
if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
216-
messagesBatch.addMessage(outstandingPublish, messageSize);
217-
251+
batch.addMessage(outstandingPublish, messageSize);
218252
// If after adding the message we have reached the batch max messages then we have a batch
219253
// to send.
220-
if (messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
221-
batchToSend = messagesBatch.popOutstandingBatch();
254+
if (batch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
255+
batchToSend = batch.popOutstandingBatch();
222256
}
223257
}
258+
224259
// Setup the next duration based delivery alarm if there are messages batched.
225-
if (!messagesBatch.isEmpty()) {
260+
if (!batch.isEmpty()) {
226261
setupDurationBasedPublishAlarm();
227-
} else if (currentAlarmFuture != null) {
228-
logger.log(Level.FINER, "Cancelling alarm, no more messages");
229-
if (activeAlarm.getAndSet(false)) {
230-
currentAlarmFuture.cancel(false);
262+
} else {
263+
messagesBatches.remove(orderingKey);
264+
if (currentAlarmFuture != null) {
265+
logger.log(Level.FINER, "Cancelling alarm, no more messages");
266+
if (activeAlarm.getAndSet(false)) {
267+
currentAlarmFuture.cancel(false);
268+
}
231269
}
232270
}
233271
} finally {
@@ -238,29 +276,18 @@ && hasBatchingBytes()
238276

239277
if (batchToSend != null) {
240278
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
241-
final OutstandingBatch finalBatchToSend = batchToSend;
242-
executor.execute(
243-
new Runnable() {
244-
@Override
245-
public void run() {
246-
publishOutstandingBatch(finalBatchToSend);
247-
}
248-
});
279+
publishAllOutstanding();
280+
publishOutstandingBatch(batchToSend);
249281
}
250282

251283
// If the message is over the size limit, it was not added to the pending messages and it will
252284
// be sent in its own batch immediately.
253285
if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) {
254286
logger.log(
255287
Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send.");
256-
executor.execute(
257-
new Runnable() {
258-
@Override
259-
public void run() {
260-
publishOutstandingBatch(
261-
new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize));
262-
}
263-
});
288+
publishAllOutstanding();
289+
publishOutstandingBatch(
290+
new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize, orderingKey));
264291
}
265292

266293
return publishResult;
@@ -292,27 +319,33 @@ public void run() {
292319
*/
293320
public void publishAllOutstanding() {
294321
messagesBatchLock.lock();
295-
OutstandingBatch batchToSend;
296322
try {
297-
if (messagesBatch.isEmpty()) {
298-
return;
323+
for (MessagesBatch batch : messagesBatches.values()) {
324+
if (!batch.isEmpty()) {
325+
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
326+
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
327+
// while this function is running. This locking mechanism needs to be improved if it
328+
// causes any performance degradation.
329+
publishOutstandingBatch(batch.popOutstandingBatch());
330+
}
299331
}
300-
batchToSend = messagesBatch.popOutstandingBatch();
332+
messagesBatches.clear();
301333
} finally {
302334
messagesBatchLock.unlock();
303335
}
304-
publishOutstandingBatch(batchToSend);
305336
}
306337

307-
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
338+
private ApiFuture publishCall(final OutstandingBatch outstandingBatch) {
308339
PublishRequest.Builder publishRequest = PublishRequest.newBuilder();
309340
publishRequest.setTopic(topicName);
310341
for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) {
311342
publishRequest.addMessages(outstandingPublish.message);
312343
}
344+
return publisherStub.publishCallable().futureCall(publishRequest.build());
345+
}
313346

314-
ApiFutures.addCallback(
315-
publisherStub.publishCallable().futureCall(publishRequest.build()),
347+
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
348+
final ApiFutureCallback<PublishResponse> futureCallback =
316349
new ApiFutureCallback<PublishResponse>() {
317350
@Override
318351
public void onSuccess(PublishResponse result) {
@@ -351,21 +384,44 @@ public void onFailure(Throwable t) {
351384
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
352385
}
353386
}
354-
},
355-
directExecutor());
387+
};
388+
389+
if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) {
390+
// If ordering key is empty, publish the batch using the normal executor.
391+
Runnable task =
392+
new Runnable() {
393+
public void run() {
394+
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
395+
}
396+
};
397+
executor.execute(task);
398+
} else {
399+
// If ordering key is specified, publish the batch using the sequential executor.
400+
Callable<ApiFuture> func =
401+
new Callable<ApiFuture>() {
402+
public ApiFuture call() {
403+
return publishCall(outstandingBatch);
404+
}
405+
};
406+
ApiFutures.addCallback(
407+
sequentialExecutor.submit(outstandingBatch.orderingKey, func), futureCallback);
408+
}
356409
}
357410

358411
private static final class OutstandingBatch {
359412
final List<OutstandingPublish> outstandingPublishes;
360413
final long creationTime;
361414
int attempt;
362415
int batchSizeBytes;
416+
final String orderingKey;
363417

364-
OutstandingBatch(List<OutstandingPublish> outstandingPublishes, int batchSizeBytes) {
418+
OutstandingBatch(
419+
List<OutstandingPublish> outstandingPublishes, int batchSizeBytes, String orderingKey) {
365420
this.outstandingPublishes = outstandingPublishes;
366421
attempt = 1;
367422
creationTime = System.currentTimeMillis();
368423
this.batchSizeBytes = batchSizeBytes;
424+
this.orderingKey = orderingKey;
369425
}
370426

371427
public int getAttempt() {
@@ -503,7 +559,7 @@ public static final class Builder {
503559
.setRpcTimeoutMultiplier(2)
504560
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
505561
.build();
506-
562+
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
507563
private static final int THREADS_PER_CPU = 5;
508564
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
509565
InstantiatingExecutorProvider.newBuilder()
@@ -517,6 +573,8 @@ public static final class Builder {
517573

518574
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
519575

576+
boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING;
577+
520578
TransportChannelProvider channelProvider =
521579
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
522580

@@ -611,6 +669,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
611669
return this;
612670
}
613671

672+
/** Sets the message ordering option. */
673+
public Builder setEnableMessageOrdering(boolean enableMessageOrdering) {
674+
this.enableMessageOrdering = enableMessageOrdering;
675+
return this;
676+
}
677+
614678
/** Gives the ability to set a custom executor to be used by the library. */
615679
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
616680
this.executorProvider = Preconditions.checkNotNull(executorProvider);
@@ -636,9 +700,14 @@ public Publisher build() throws IOException {
636700
private static class MessagesBatch {
637701
private List<OutstandingPublish> messages = new LinkedList<>();
638702
private int batchedBytes;
703+
private String orderingKey;
704+
705+
public MessagesBatch(String orderingKey) {
706+
this.orderingKey = orderingKey;
707+
}
639708

640-
private OutstandingBatch popOutstandingBatch() {
641-
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
709+
public OutstandingBatch popOutstandingBatch() {
710+
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey);
642711
reset();
643712
return batch;
644713
}

0 commit comments

Comments
 (0)