Skip to content

Commit cb6b5ac

Browse files
committed
---
yaml --- r: 35373 b: refs/heads/pubsub-ordering-keys c: c01aa11 h: refs/heads/master i: 35371: 3d4cec9
1 parent ececcfa commit cb6b5ac

7 files changed

Lines changed: 978 additions & 25 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: 4ffdda3e382001ef6bf83ee68da4ba5e1d434671
158+
refs/heads/pubsub-ordering-keys: c01aa11eb9b8faddb39b5b4d860058ab90887e29
159159
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
160160
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032
161161
refs/tags/v0.77.0: 28a85a77883ccf5d48f297fd0ef3b3dca6ce01f0

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class MessageDispatcher {
6262
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
6363

6464
private final Executor executor;
65+
private final SequentialExecutorService.AutoExecutor sequentialExecutor;
6566
private final ScheduledExecutorService systemExecutor;
6667
private final ApiClock clock;
6768

@@ -205,6 +206,7 @@ void sendAckOperations(
205206
jobLock = new ReentrantLock();
206207
messagesWaiter = new MessageWaiter();
207208
this.clock = clock;
209+
this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
208210
}
209211

210212
void start() {
@@ -349,7 +351,7 @@ public void nack() {
349351
}
350352
};
351353
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
352-
executor.execute(
354+
Runnable deliverMessageTask =
353355
new Runnable() {
354356
@Override
355357
public void run() {
@@ -370,7 +372,12 @@ public void run() {
370372
response.setException(e);
371373
}
372374
}
373-
});
375+
};
376+
if (message.getOrderingKey().isEmpty()) {
377+
executor.execute(deliverMessageTask);
378+
} else {
379+
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
380+
}
374381
}
375382

376383
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 133 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@
4949
import com.google.pubsub.v1.TopicNames;
5050
import java.io.IOException;
5151
import java.util.ArrayList;
52+
import java.util.HashMap;
5253
import java.util.Iterator;
5354
import java.util.LinkedList;
5455
import java.util.List;
56+
import java.util.Map;
57+
import java.util.concurrent.Callable;
5558
import java.util.concurrent.ScheduledExecutorService;
5659
import java.util.concurrent.ScheduledFuture;
5760
import java.util.concurrent.TimeUnit;
@@ -85,15 +88,17 @@ public class Publisher {
8588
private final String topicName;
8689

8790
private final BatchingSettings batchingSettings;
91+
private final boolean enableMessageOrdering;
8892

8993
private final Lock messagesBatchLock;
90-
private MessagesBatch messagesBatch;
94+
final Map<String, MessagesBatch> messagesBatches;
9195

9296
private final AtomicBoolean activeAlarm;
9397

9498
private final PublisherStub publisherStub;
9599

96100
private final ScheduledExecutorService executor;
101+
final SequentialExecutorService.CallbackExecutor sequentialExecutor;
97102
private final AtomicBoolean shutdown;
98103
private final BackgroundResource backgroundResources;
99104
private final MessageWaiter messagesWaiter;
@@ -114,22 +119,33 @@ private Publisher(Builder builder) throws IOException {
114119
topicName = builder.topicName;
115120

116121
this.batchingSettings = builder.batchingSettings;
122+
this.enableMessageOrdering = builder.enableMessageOrdering;
117123
this.messageTransform = builder.messageTransform;
118124

119-
messagesBatch = new MessagesBatch(batchingSettings);
125+
messagesBatches = new HashMap<>();
120126
messagesBatchLock = new ReentrantLock();
121127
activeAlarm = new AtomicBoolean(false);
122128
executor = builder.executorProvider.getExecutor();
129+
sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor);
123130
List<BackgroundResource> backgroundResourceList = new ArrayList<>();
124131
if (builder.executorProvider.shouldAutoClose()) {
125132
backgroundResourceList.add(new ExecutorAsBackgroundResource(executor));
126133
}
127134

128135
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
129136
// We post-process this here to keep backward-compatibility.
130-
RetrySettings retrySettings = builder.retrySettings;
131-
if (retrySettings.getMaxAttempts() == 0) {
132-
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build();
137+
// Also, if "message ordering" is enabled, the publisher should retry sending the failed
138+
// message infinitely rather than sending the next one.
139+
RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder();
140+
if (retrySettingsBuilder.getMaxAttempts() == 0) {
141+
retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE);
142+
}
143+
if (enableMessageOrdering) {
144+
// TODO: is there a way to have the default retry settings for requests without an ordering
145+
// key?
146+
retrySettingsBuilder
147+
.setMaxAttempts(Integer.MAX_VALUE)
148+
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
133149
}
134150

135151
PublisherStubSettings.Builder stubSettings =
@@ -147,7 +163,7 @@ private Publisher(Builder builder) throws IOException {
147163
StatusCode.Code.RESOURCE_EXHAUSTED,
148164
StatusCode.Code.UNKNOWN,
149165
StatusCode.Code.UNAVAILABLE)
150-
.setRetrySettings(retrySettings)
166+
.setRetrySettings(retrySettingsBuilder.build())
151167
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
152168
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
153169
backgroundResourceList.add(publisherStub);
@@ -194,13 +210,27 @@ public String getTopicNameString() {
194210
public ApiFuture<String> publish(PubsubMessage message) {
195211
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
196212

213+
final String orderingKey = message.getOrderingKey();
214+
Preconditions.checkState(
215+
orderingKey.isEmpty() || enableMessageOrdering,
216+
"Cannot publish a message with an ordering key when message ordering is not enabled.");
217+
197218
final OutstandingPublish outstandingPublish =
198219
new OutstandingPublish(messageTransform.apply(message));
199220
List<OutstandingBatch> batchesToSend;
200221
messagesBatchLock.lock();
201222
try {
223+
// Check if the next message makes the current batch exceed the max batch byte size.
224+
MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
225+
if (messagesBatch == null) {
226+
messagesBatch = new MessagesBatch(batchingSettings, orderingKey);
227+
messagesBatches.put(orderingKey, messagesBatch);
228+
}
229+
202230
batchesToSend = messagesBatch.add(outstandingPublish);
203-
// Setup the next duration based delivery alarm if there are messages batched.
231+
if (!batchesToSend.isEmpty() && messagesBatch.isEmpty()) {
232+
messagesBatches.remove(orderingKey);
233+
}
204234
setupAlarm();
205235
} finally {
206236
messagesBatchLock.unlock();
@@ -209,6 +239,10 @@ public ApiFuture<String> publish(PubsubMessage message) {
209239
messagesWaiter.incrementPendingMessages(1);
210240

211241
if (!batchesToSend.isEmpty()) {
242+
// TODO: if this is not an ordering keys scenario, will this do anything?
243+
publishAllWithoutInflight();
244+
245+
// TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock?
212246
for (final OutstandingBatch batch : batchesToSend) {
213247
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
214248
executor.execute(
@@ -224,8 +258,22 @@ public void run() {
224258
return outstandingPublish.publishResult;
225259
}
226260

261+
/**
262+
* There may be non-recoverable problems with a request for an ordering key. In that case, all
263+
* subsequent requests will fail until this method is called. If the key is not currently paused,
264+
* calling this method will be a no-op.
265+
*
266+
* @param key The key for which to resume publishing.
267+
*/
268+
// TODO: make this public when Ordering keys is live
269+
@BetaApi
270+
void resumePublish(String key) {
271+
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
272+
sequentialExecutor.resumePublish(key);
273+
}
274+
227275
private void setupAlarm() {
228-
if (!messagesBatch.isEmpty()) {
276+
if (!messagesBatches.isEmpty()) {
229277
if (!activeAlarm.getAndSet(true)) {
230278
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
231279
logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs);
@@ -236,7 +284,7 @@ private void setupAlarm() {
236284
public void run() {
237285
logger.log(Level.FINER, "Sending messages based on schedule.");
238286
activeAlarm.getAndSet(false);
239-
publishAllOutstanding();
287+
publishAllWithoutInflight();
240288
}
241289
},
242290
delayThresholdMs,
@@ -257,16 +305,51 @@ public void run() {
257305
*/
258306
public void publishAllOutstanding() {
259307
messagesBatchLock.lock();
260-
OutstandingBatch batchToSend;
261308
try {
262-
if (messagesBatch.isEmpty()) {
263-
return;
309+
for (MessagesBatch batch : messagesBatches.values()) {
310+
if (!batch.isEmpty()) {
311+
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
312+
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
313+
// while this function is running. This locking mechanism needs to be improved if it
314+
// causes any performance degradation.
315+
publishOutstandingBatch(batch.popOutstandingBatch());
316+
}
317+
}
318+
messagesBatches.clear();
319+
} finally {
320+
messagesBatchLock.unlock();
321+
}
322+
}
323+
324+
/**
325+
* Publish any outstanding batches if non-empty and there are no other batches in flight. This
326+
* method sends buffered messages, but does not wait for the send operations to complete. To wait
327+
* for messages to send, call {@code get} on the futures returned from {@code publish}.
328+
*/
329+
private void publishAllWithoutInflight() {
330+
messagesBatchLock.lock();
331+
try {
332+
Iterator<Map.Entry<String, MessagesBatch>> it = messagesBatches.entrySet().iterator();
333+
while (it.hasNext()) {
334+
Map.Entry<String, MessagesBatch> entry = it.next();
335+
MessagesBatch batch = entry.getValue();
336+
String key = entry.getKey();
337+
if (batch.isEmpty()) {
338+
it.remove();
339+
} else if (key.isEmpty() || !sequentialExecutor.hasTasksInflight(key)) {
340+
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
341+
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
342+
// while this function is running. This locking mechanism needs to be improved if it
343+
// causes any performance degradation.
344+
345+
// TODO: Will this cause a performance problem for non-ordering keys scenarios?
346+
publishOutstandingBatch(batch.popOutstandingBatch());
347+
it.remove();
348+
}
264349
}
265-
batchToSend = messagesBatch.popOutstandingBatch();
266350
} finally {
267351
messagesBatchLock.unlock();
268352
}
269-
publishOutstandingBatch(batchToSend);
270353
}
271354

272355
private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
@@ -280,12 +363,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
280363
}
281364

282365
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
283-
ApiFutureCallback<PublishResponse> futureCallback =
366+
final ApiFutureCallback<PublishResponse> futureCallback =
284367
new ApiFutureCallback<PublishResponse>() {
285368
@Override
286369
public void onSuccess(PublishResponse result) {
287370
try {
288-
if (result.getMessageIdsCount() != outstandingBatch.size()) {
371+
if (result == null || result.getMessageIdsCount() != outstandingBatch.size()) {
289372
outstandingBatch.onFailure(
290373
new IllegalStateException(
291374
String.format(
@@ -311,20 +394,37 @@ public void onFailure(Throwable t) {
311394
}
312395
};
313396

314-
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
397+
ApiFuture<PublishResponse> future;
398+
if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) {
399+
future = publishCall(outstandingBatch);
400+
} else {
401+
// If ordering key is specified, publish the batch using the sequential executor.
402+
future =
403+
sequentialExecutor.submit(
404+
outstandingBatch.orderingKey,
405+
new Callable<ApiFuture<PublishResponse>>() {
406+
public ApiFuture<PublishResponse> call() {
407+
return publishCall(outstandingBatch);
408+
}
409+
});
410+
}
411+
ApiFutures.addCallback(future, futureCallback, directExecutor());
315412
}
316413

317414
private static final class OutstandingBatch {
318415
final List<OutstandingPublish> outstandingPublishes;
319416
final long creationTime;
320417
int attempt;
321418
int batchSizeBytes;
419+
final String orderingKey;
322420

323-
OutstandingBatch(List<OutstandingPublish> outstandingPublishes, int batchSizeBytes) {
421+
OutstandingBatch(
422+
List<OutstandingPublish> outstandingPublishes, int batchSizeBytes, String orderingKey) {
324423
this.outstandingPublishes = outstandingPublishes;
325424
attempt = 1;
326425
creationTime = System.currentTimeMillis();
327426
this.batchSizeBytes = batchSizeBytes;
427+
this.orderingKey = orderingKey;
328428
}
329429

330430
int size() {
@@ -468,7 +568,7 @@ public static final class Builder {
468568
.setRpcTimeoutMultiplier(2)
469569
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
470570
.build();
471-
571+
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
472572
private static final int THREADS_PER_CPU = 5;
473573
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
474574
InstantiatingExecutorProvider.newBuilder()
@@ -482,6 +582,8 @@ public static final class Builder {
482582

483583
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
484584

585+
private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING;
586+
485587
private TransportChannelProvider channelProvider =
486588
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
487589

@@ -576,6 +678,14 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
576678
return this;
577679
}
578680

681+
/** Sets the message ordering option. */
682+
// TODO: make this public when Ordering keys is live
683+
@BetaApi
684+
Builder setEnableMessageOrdering(boolean enableMessageOrdering) {
685+
this.enableMessageOrdering = enableMessageOrdering;
686+
return this;
687+
}
688+
579689
/** Gives the ability to set a custom executor to be used by the library. */
580690
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
581691
this.executorProvider = Preconditions.checkNotNull(executorProvider);
@@ -601,15 +711,17 @@ public Publisher build() throws IOException {
601711
private static class MessagesBatch {
602712
private List<OutstandingPublish> messages;
603713
private int batchedBytes;
714+
private String orderingKey;
604715
private final BatchingSettings batchingSettings;
605716

606-
public MessagesBatch(BatchingSettings batchingSettings) {
717+
private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) {
607718
this.batchingSettings = batchingSettings;
719+
this.orderingKey = orderingKey;
608720
reset();
609721
}
610722

611723
private OutstandingBatch popOutstandingBatch() {
612-
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
724+
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey);
613725
reset();
614726
return batch;
615727
}

0 commit comments

Comments
 (0)