Skip to content

Commit 1d3c42c

Browse files
committed
---
yaml --- r: 35351 b: refs/heads/pubsub-ordering-keys c: 2400120 h: refs/heads/master i: 35349: 5f5f838 35347: be7ac39 35343: b12b50d
1 parent 40f862d commit 1d3c42c

7 files changed

Lines changed: 979 additions & 25 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: 7ef8b85b7d1bd06fbf0f145f73ca9825b01589bb
158+
refs/heads/pubsub-ordering-keys: 2400120cc562ad95c011812a1ff69ca734b19255
159159
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
160160
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032
161161
refs/tags/v0.77.0: 28a85a77883ccf5d48f297fd0ef3b3dca6ce01f0

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class MessageDispatcher {
6262
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
6363

6464
private final Executor executor;
65+
private final SequentialExecutorService.AutoExecutor sequentialExecutor;
6566
private final ScheduledExecutorService systemExecutor;
6667
private final ApiClock clock;
6768

@@ -205,6 +206,7 @@ void sendAckOperations(
205206
jobLock = new ReentrantLock();
206207
messagesWaiter = new MessageWaiter();
207208
this.clock = clock;
209+
this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
208210
}
209211

210212
void start() {
@@ -349,7 +351,7 @@ public void nack() {
349351
}
350352
};
351353
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
352-
executor.execute(
354+
Runnable deliverMessageTask =
353355
new Runnable() {
354356
@Override
355357
public void run() {
@@ -370,7 +372,12 @@ public void run() {
370372
response.setException(e);
371373
}
372374
}
373-
});
375+
};
376+
if (message.getOrderingKey().isEmpty()) {
377+
executor.execute(deliverMessageTask);
378+
} else {
379+
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
380+
}
374381
}
375382

376383
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 134 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,13 @@
4949
import com.google.pubsub.v1.TopicNames;
5050
import java.io.IOException;
5151
import java.util.ArrayList;
52+
import java.util.EnumSet;
53+
import java.util.HashMap;
5254
import java.util.Iterator;
5355
import java.util.LinkedList;
5456
import java.util.List;
57+
import java.util.Map;
58+
import java.util.concurrent.Callable;
5559
import java.util.concurrent.ScheduledExecutorService;
5660
import java.util.concurrent.ScheduledFuture;
5761
import java.util.concurrent.TimeUnit;
@@ -85,15 +89,17 @@ public class Publisher {
8589
private final String topicName;
8690

8791
private final BatchingSettings batchingSettings;
92+
private final boolean enableMessageOrdering;
8893

8994
private final Lock messagesBatchLock;
90-
private MessagesBatch messagesBatch;
95+
final Map<String, MessagesBatch> messagesBatches;
9196

9297
private final AtomicBoolean activeAlarm;
9398

9499
private final PublisherStub publisherStub;
95100

96101
private final ScheduledExecutorService executor;
102+
final SequentialExecutorService.CallbackExecutor sequentialExecutor;
97103
private final AtomicBoolean shutdown;
98104
private final BackgroundResource backgroundResources;
99105
private final MessageWaiter messagesWaiter;
@@ -114,22 +120,33 @@ private Publisher(Builder builder) throws IOException {
114120
topicName = builder.topicName;
115121

116122
this.batchingSettings = builder.batchingSettings;
123+
this.enableMessageOrdering = builder.enableMessageOrdering;
117124
this.messageTransform = builder.messageTransform;
118125

119-
messagesBatch = new MessagesBatch(batchingSettings);
126+
messagesBatches = new HashMap<>();
120127
messagesBatchLock = new ReentrantLock();
121128
activeAlarm = new AtomicBoolean(false);
122129
executor = builder.executorProvider.getExecutor();
130+
sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor);
123131
List<BackgroundResource> backgroundResourceList = new ArrayList<>();
124132
if (builder.executorProvider.shouldAutoClose()) {
125133
backgroundResourceList.add(new ExecutorAsBackgroundResource(executor));
126134
}
127135

128136
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
129137
// We post-process this here to keep backward-compatibility.
130-
RetrySettings retrySettings = builder.retrySettings;
131-
if (retrySettings.getMaxAttempts() == 0) {
132-
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build();
138+
// Also, if "message ordering" is enabled, the publisher should retry sending the failed
139+
// message infinitely rather than sending the next one.
140+
RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder();
141+
if (retrySettingsBuilder.getMaxAttempts() == 0) {
142+
retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE);
143+
}
144+
if (enableMessageOrdering) {
145+
// TODO: is there a way to have the default retry settings for requests without an ordering
146+
// key?
147+
retrySettingsBuilder
148+
.setMaxAttempts(Integer.MAX_VALUE)
149+
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
133150
}
134151

135152
PublisherStubSettings.Builder stubSettings =
@@ -147,7 +164,7 @@ private Publisher(Builder builder) throws IOException {
147164
StatusCode.Code.RESOURCE_EXHAUSTED,
148165
StatusCode.Code.UNKNOWN,
149166
StatusCode.Code.UNAVAILABLE)
150-
.setRetrySettings(retrySettings)
167+
.setRetrySettings(retrySettingsBuilder.build())
151168
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
152169
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
153170
backgroundResourceList.add(publisherStub);
@@ -194,13 +211,27 @@ public String getTopicNameString() {
194211
public ApiFuture<String> publish(PubsubMessage message) {
195212
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
196213

214+
final String orderingKey = message.getOrderingKey();
215+
Preconditions.checkState(
216+
orderingKey.isEmpty() || enableMessageOrdering,
217+
"Cannot publish a message with an ordering key when message ordering is not enabled.");
218+
197219
final OutstandingPublish outstandingPublish =
198220
new OutstandingPublish(messageTransform.apply(message));
199221
List<OutstandingBatch> batchesToSend;
200222
messagesBatchLock.lock();
201223
try {
224+
// Check if the next message makes the current batch exceed the max batch byte size.
225+
MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
226+
if (messagesBatch == null) {
227+
messagesBatch = new MessagesBatch(batchingSettings, orderingKey);
228+
messagesBatches.put(orderingKey, messagesBatch);
229+
}
230+
202231
batchesToSend = messagesBatch.add(outstandingPublish);
203-
// Setup the next duration based delivery alarm if there are messages batched.
232+
if (!batchesToSend.isEmpty() && messagesBatch.isEmpty()) {
233+
messagesBatches.remove(orderingKey);
234+
}
204235
setupAlarm();
205236
} finally {
206237
messagesBatchLock.unlock();
@@ -209,6 +240,10 @@ public ApiFuture<String> publish(PubsubMessage message) {
209240
messagesWaiter.incrementPendingMessages(1);
210241

211242
if (!batchesToSend.isEmpty()) {
243+
// TODO: if this is not an ordering keys scenario, will this do anything?
244+
publishAllWithoutInflight();
245+
246+
// TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock?
212247
for (final OutstandingBatch batch : batchesToSend) {
213248
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
214249
executor.execute(
@@ -224,8 +259,22 @@ public void run() {
224259
return outstandingPublish.publishResult;
225260
}
226261

262+
/**
263+
* There may be non-recoverable problems with a request for an ordering key. In that case, all
264+
* subsequent requests will fail until this method is called. If the key is not currently paused,
265+
* calling this method will be a no-op.
266+
*
267+
* @param key The key for which to resume publishing.
268+
*/
269+
// TODO: make this public when Ordering keys is live
270+
@BetaApi
271+
void resumePublish(String key) {
272+
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
273+
sequentialExecutor.resumePublish(key);
274+
}
275+
227276
private void setupAlarm() {
228-
if (!messagesBatch.isEmpty()) {
277+
if (!messagesBatches.isEmpty()) {
229278
if (!activeAlarm.getAndSet(true)) {
230279
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
231280
logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs);
@@ -236,7 +285,7 @@ private void setupAlarm() {
236285
public void run() {
237286
logger.log(Level.FINER, "Sending messages based on schedule.");
238287
activeAlarm.getAndSet(false);
239-
publishAllOutstanding();
288+
publishAllWithoutInflight();
240289
}
241290
},
242291
delayThresholdMs,
@@ -257,16 +306,51 @@ public void run() {
257306
*/
258307
public void publishAllOutstanding() {
259308
messagesBatchLock.lock();
260-
OutstandingBatch batchToSend;
261309
try {
262-
if (messagesBatch.isEmpty()) {
263-
return;
310+
for (MessagesBatch batch : messagesBatches.values()) {
311+
if (!batch.isEmpty()) {
312+
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
313+
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
314+
// while this function is running. This locking mechanism needs to be improved if it
315+
// causes any performance degradation.
316+
publishOutstandingBatch(batch.popOutstandingBatch());
317+
}
318+
}
319+
messagesBatches.clear();
320+
} finally {
321+
messagesBatchLock.unlock();
322+
}
323+
}
324+
325+
/**
326+
* Publish any outstanding batches if non-empty and there are no other batches in flight. This
327+
* method sends buffered messages, but does not wait for the send operations to complete. To wait
328+
* for messages to send, call {@code get} on the futures returned from {@code publish}.
329+
*/
330+
private void publishAllWithoutInflight() {
331+
messagesBatchLock.lock();
332+
try {
333+
Iterator<Map.Entry<String, MessagesBatch>> it = messagesBatches.entrySet().iterator();
334+
while (it.hasNext()) {
335+
Map.Entry<String, MessagesBatch> entry = it.next();
336+
MessagesBatch batch = entry.getValue();
337+
String key = entry.getKey();
338+
if (batch.isEmpty()) {
339+
it.remove();
340+
} else if (key.isEmpty() || !sequentialExecutor.hasTasksInflight(key)) {
341+
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
342+
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
343+
// while this function is running. This locking mechanism needs to be improved if it
344+
// causes any performance degradation.
345+
346+
// TODO: Will this cause a performance problem for non-ordering keys scenarios?
347+
publishOutstandingBatch(batch.popOutstandingBatch());
348+
it.remove();
349+
}
264350
}
265-
batchToSend = messagesBatch.popOutstandingBatch();
266351
} finally {
267352
messagesBatchLock.unlock();
268353
}
269-
publishOutstandingBatch(batchToSend);
270354
}
271355

272356
private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
@@ -280,12 +364,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
280364
}
281365

282366
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
283-
ApiFutureCallback<PublishResponse> futureCallback =
367+
final ApiFutureCallback<PublishResponse> futureCallback =
284368
new ApiFutureCallback<PublishResponse>() {
285369
@Override
286370
public void onSuccess(PublishResponse result) {
287371
try {
288-
if (result.getMessageIdsCount() != outstandingBatch.size()) {
372+
if (result == null || result.getMessageIdsCount() != outstandingBatch.size()) {
289373
outstandingBatch.onFailure(
290374
new IllegalStateException(
291375
String.format(
@@ -311,20 +395,37 @@ public void onFailure(Throwable t) {
311395
}
312396
};
313397

314-
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
398+
ApiFuture<PublishResponse> future;
399+
if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) {
400+
future = publishCall(outstandingBatch);
401+
} else {
402+
// If ordering key is specified, publish the batch using the sequential executor.
403+
future =
404+
sequentialExecutor.submit(
405+
outstandingBatch.orderingKey,
406+
new Callable<ApiFuture<PublishResponse>>() {
407+
public ApiFuture<PublishResponse> call() {
408+
return publishCall(outstandingBatch);
409+
}
410+
});
411+
}
412+
ApiFutures.addCallback(future, futureCallback, directExecutor());
315413
}
316414

317415
private static final class OutstandingBatch {
318416
final List<OutstandingPublish> outstandingPublishes;
319417
final long creationTime;
320418
int attempt;
321419
int batchSizeBytes;
420+
final String orderingKey;
322421

323-
OutstandingBatch(List<OutstandingPublish> outstandingPublishes, int batchSizeBytes) {
422+
OutstandingBatch(
423+
List<OutstandingPublish> outstandingPublishes, int batchSizeBytes, String orderingKey) {
324424
this.outstandingPublishes = outstandingPublishes;
325425
attempt = 1;
326426
creationTime = System.currentTimeMillis();
327427
this.batchSizeBytes = batchSizeBytes;
428+
this.orderingKey = orderingKey;
328429
}
329430

330431
int size() {
@@ -468,7 +569,7 @@ public static final class Builder {
468569
.setRpcTimeoutMultiplier(2)
469570
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
470571
.build();
471-
572+
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
472573
private static final int THREADS_PER_CPU = 5;
473574
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
474575
InstantiatingExecutorProvider.newBuilder()
@@ -482,6 +583,8 @@ public static final class Builder {
482583

483584
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
484585

586+
private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING;
587+
485588
private TransportChannelProvider channelProvider =
486589
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
487590

@@ -576,6 +679,14 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
576679
return this;
577680
}
578681

682+
/** Sets the message ordering option. */
683+
// TODO: make this public when Ordering keys is live
684+
@BetaApi
685+
Builder setEnableMessageOrdering(boolean enableMessageOrdering) {
686+
this.enableMessageOrdering = enableMessageOrdering;
687+
return this;
688+
}
689+
579690
/** Gives the ability to set a custom executor to be used by the library. */
580691
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
581692
this.executorProvider = Preconditions.checkNotNull(executorProvider);
@@ -601,15 +712,17 @@ public Publisher build() throws IOException {
601712
private static class MessagesBatch {
602713
private List<OutstandingPublish> messages;
603714
private int batchedBytes;
715+
private String orderingKey;
604716
private final BatchingSettings batchingSettings;
605717

606-
public MessagesBatch(BatchingSettings batchingSettings) {
718+
private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) {
607719
this.batchingSettings = batchingSettings;
720+
this.orderingKey = orderingKey;
608721
reset();
609722
}
610723

611724
private OutstandingBatch popOutstandingBatch() {
612-
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
725+
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey);
613726
reset();
614727
return batch;
615728
}

0 commit comments

Comments
 (0)