Skip to content

Commit 5babd56

Browse files
authored
---
yaml --- r: 27225 b: refs/heads/pubsub-ordering-keys c: a2741ae h: refs/heads/master i: 27223: 2de9c2c
1 parent f7274c1 commit 5babd56

8 files changed

Lines changed: 60 additions & 844 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: 54420a4f138ed317972ad7530d6e6c9bb7664104
158+
refs/heads/pubsub-ordering-keys: a2741ae76ff1bdb35225fa3e29ec6f293427a8e8
159159
"refs/heads/update_mvn_badge": ae2d773814db0f71197ccf5a8612ee1d8056f8de
160160
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
161161
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032

branches/pubsub-ordering-keys/google-api-grpc/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,7 @@
10061006
<show>protected</show>
10071007
<nohelp>true</nohelp>
10081008
<outputDirectory>${project.build.directory}/javadoc</outputDirectory>
1009+
<splitindex>true</splitindex>
10091010
</configuration>
10101011
</plugin>
10111012
</plugins>

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ class MessageDispatcher {
6262
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
6363

6464
private final Executor executor;
65-
private final SequentialExecutorService sequentialExecutor;
6665
private final ScheduledExecutorService systemExecutor;
6766
private final ApiClock clock;
6867

@@ -206,7 +205,6 @@ void sendAckOperations(
206205
jobLock = new ReentrantLock();
207206
messagesWaiter = new MessageWaiter();
208207
this.clock = clock;
209-
this.sequentialExecutor = new SequentialExecutorService(executor);
210208
}
211209

212210
void start() {
@@ -351,7 +349,7 @@ public void nack() {
351349
}
352350
};
353351
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
354-
Runnable deliverMessageTask =
352+
executor.execute(
355353
new Runnable() {
356354
@Override
357355
public void run() {
@@ -372,12 +370,7 @@ public void run() {
372370
response.setException(e);
373371
}
374372
}
375-
};
376-
if (message.getOrderingKey().isEmpty()) {
377-
executor.execute(deliverMessageTask);
378-
} else {
379-
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
380-
}
373+
});
381374
}
382375

383376
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 55 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,9 @@
4848
import com.google.pubsub.v1.TopicNames;
4949
import java.io.IOException;
5050
import java.util.Collections;
51-
import java.util.EnumSet;
52-
import java.util.HashMap;
5351
import java.util.Iterator;
5452
import java.util.LinkedList;
5553
import java.util.List;
56-
import java.util.Map;
57-
import java.util.Set;
58-
import java.util.concurrent.Callable;
5954
import java.util.concurrent.ScheduledExecutorService;
6055
import java.util.concurrent.ScheduledFuture;
6156
import java.util.concurrent.TimeUnit;
@@ -89,17 +84,15 @@ public class Publisher {
8984
private final String topicName;
9085

9186
private final BatchingSettings batchingSettings;
92-
private final boolean enableMessageOrdering;
9387

9488
private final Lock messagesBatchLock;
95-
private final Map<String, MessagesBatch> messagesBatches;
89+
private MessagesBatch messagesBatch;
9690

9791
private final AtomicBoolean activeAlarm;
9892

9993
private final PublisherStub publisherStub;
10094

10195
private final ScheduledExecutorService executor;
102-
private final SequentialExecutorService<PublishResponse> sequentialExecutor;
10396
private final AtomicBoolean shutdown;
10497
private final List<AutoCloseable> closeables;
10598
private final MessageWaiter messagesWaiter;
@@ -120,14 +113,12 @@ private Publisher(Builder builder) throws IOException {
120113
topicName = builder.topicName;
121114

122115
this.batchingSettings = builder.batchingSettings;
123-
this.enableMessageOrdering = builder.enableMessageOrdering;
124116
this.messageTransform = builder.messageTransform;
125117

126-
messagesBatches = new HashMap<>();
118+
messagesBatch = new MessagesBatch();
127119
messagesBatchLock = new ReentrantLock();
128120
activeAlarm = new AtomicBoolean(false);
129121
executor = builder.executorProvider.getExecutor();
130-
sequentialExecutor = new SequentialExecutorService<>(executor);
131122
if (builder.executorProvider.shouldAutoClose()) {
132123
closeables =
133124
Collections.<AutoCloseable>singletonList(new ExecutorAsBackgroundResource(executor));
@@ -137,31 +128,9 @@ private Publisher(Builder builder) throws IOException {
137128

138129
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
139130
// We post-process this here to keep backward-compatibility.
140-
// Also, if "message ordering" is enabled, the publisher should retry sending the failed
141-
// message infinitely rather than sending the next one.
142-
RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder();
143-
if (retrySettingsBuilder.getMaxAttempts() == 0) {
144-
retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE);
145-
}
146-
if (enableMessageOrdering) {
147-
retrySettingsBuilder
148-
.setMaxAttempts(Integer.MAX_VALUE)
149-
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
150-
}
151-
152-
Set<StatusCode.Code> retryCodes;
153-
if (enableMessageOrdering) {
154-
retryCodes = EnumSet.allOf(StatusCode.Code.class);
155-
} else {
156-
retryCodes =
157-
EnumSet.of(
158-
StatusCode.Code.ABORTED,
159-
StatusCode.Code.CANCELLED,
160-
StatusCode.Code.DEADLINE_EXCEEDED,
161-
StatusCode.Code.INTERNAL,
162-
StatusCode.Code.RESOURCE_EXHAUSTED,
163-
StatusCode.Code.UNKNOWN,
164-
StatusCode.Code.UNAVAILABLE);
131+
RetrySettings retrySettings = builder.retrySettings;
132+
if (retrySettings.getMaxAttempts() == 0) {
133+
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build();
165134
}
166135

167136
PublisherStubSettings.Builder stubSettings =
@@ -171,8 +140,15 @@ private Publisher(Builder builder) throws IOException {
171140
.setTransportChannelProvider(builder.channelProvider);
172141
stubSettings
173142
.publishSettings()
174-
.setRetryableCodes(retryCodes)
175-
.setRetrySettings(retrySettingsBuilder.build())
143+
.setRetryableCodes(
144+
StatusCode.Code.ABORTED,
145+
StatusCode.Code.CANCELLED,
146+
StatusCode.Code.DEADLINE_EXCEEDED,
147+
StatusCode.Code.INTERNAL,
148+
StatusCode.Code.RESOURCE_EXHAUSTED,
149+
StatusCode.Code.UNKNOWN,
150+
StatusCode.Code.UNAVAILABLE)
151+
.setRetrySettings(retrySettings)
176152
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
177153
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
178154

@@ -220,12 +196,6 @@ public ApiFuture<String> publish(PubsubMessage message) {
220196
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
221197
}
222198

223-
final String orderingKey = message.getOrderingKey();
224-
if (orderingKey != null && !orderingKey.isEmpty() && !enableMessageOrdering) {
225-
throw new IllegalStateException(
226-
"Cannot publish a message with an ordering key when message ordering is not enabled.");
227-
}
228-
229199
message = messageTransform.apply(message);
230200
final int messageSize = message.getSerializedSize();
231201
OutstandingBatch batchToSend = null;
@@ -234,38 +204,30 @@ public ApiFuture<String> publish(PubsubMessage message) {
234204
messagesBatchLock.lock();
235205
try {
236206
// Check if the next message makes the current batch exceed the max batch byte size.
237-
MessagesBatch messageBatch = messagesBatches.get(orderingKey);
238-
if (messageBatch == null) {
239-
messageBatch = new MessagesBatch(orderingKey);
240-
messagesBatches.put(orderingKey, messageBatch);
241-
}
242-
if (!messageBatch.isEmpty()
207+
if (!messagesBatch.isEmpty()
243208
&& hasBatchingBytes()
244-
&& messageBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
245-
batchToSend = messageBatch.popOutstandingBatch();
209+
&& messagesBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
210+
batchToSend = messagesBatch.popOutstandingBatch();
246211
}
247212

248213
// Border case if the message to send is greater or equals to the max batch size then can't
249214
// be included in the current batch and instead sent immediately.
250215
if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
251-
messageBatch.addMessage(outstandingPublish, messageSize);
216+
messagesBatch.addMessage(outstandingPublish, messageSize);
217+
252218
// If after adding the message we have reached the batch max messages then we have a batch
253219
// to send.
254-
if (messageBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
255-
batchToSend = messageBatch.popOutstandingBatch();
220+
if (messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
221+
batchToSend = messagesBatch.popOutstandingBatch();
256222
}
257223
}
258-
259224
// Setup the next duration based delivery alarm if there are messages batched.
260-
if (!messageBatch.isEmpty()) {
225+
if (!messagesBatch.isEmpty()) {
261226
setupDurationBasedPublishAlarm();
262-
} else {
263-
messagesBatches.remove(orderingKey);
264-
if (currentAlarmFuture != null) {
265-
logger.log(Level.FINER, "Cancelling alarm, no more messages");
266-
if (activeAlarm.getAndSet(false)) {
267-
currentAlarmFuture.cancel(false);
268-
}
227+
} else if (currentAlarmFuture != null) {
228+
logger.log(Level.FINER, "Cancelling alarm, no more messages");
229+
if (activeAlarm.getAndSet(false)) {
230+
currentAlarmFuture.cancel(false);
269231
}
270232
}
271233
} finally {
@@ -276,18 +238,29 @@ && hasBatchingBytes()
276238

277239
if (batchToSend != null) {
278240
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
279-
publishAllOutstanding();
280-
publishOutstandingBatch(batchToSend);
241+
final OutstandingBatch finalBatchToSend = batchToSend;
242+
executor.execute(
243+
new Runnable() {
244+
@Override
245+
public void run() {
246+
publishOutstandingBatch(finalBatchToSend);
247+
}
248+
});
281249
}
282250

283251
// If the message is over the size limit, it was not added to the pending messages and it will
284252
// be sent in its own batch immediately.
285253
if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) {
286254
logger.log(
287255
Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send.");
288-
publishAllOutstanding();
289-
publishOutstandingBatch(
290-
new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize, orderingKey));
256+
executor.execute(
257+
new Runnable() {
258+
@Override
259+
public void run() {
260+
publishOutstandingBatch(
261+
new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize));
262+
}
263+
});
291264
}
292265

293266
return publishResult;
@@ -319,33 +292,27 @@ public void run() {
319292
*/
320293
public void publishAllOutstanding() {
321294
messagesBatchLock.lock();
295+
OutstandingBatch batchToSend;
322296
try {
323-
for (MessagesBatch batch : messagesBatches.values()) {
324-
if (!batch.isEmpty()) {
325-
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
326-
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
327-
// while this function is running. This locking mechanism needs to be improved if it
328-
// causes any performance degradation.
329-
publishOutstandingBatch(batch.popOutstandingBatch());
330-
}
297+
if (messagesBatch.isEmpty()) {
298+
return;
331299
}
332-
messagesBatches.clear();
300+
batchToSend = messagesBatch.popOutstandingBatch();
333301
} finally {
334302
messagesBatchLock.unlock();
335303
}
304+
publishOutstandingBatch(batchToSend);
336305
}
337306

338-
private ApiFuture publishCall(final OutstandingBatch outstandingBatch) {
307+
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
339308
PublishRequest.Builder publishRequest = PublishRequest.newBuilder();
340309
publishRequest.setTopic(topicName);
341310
for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) {
342311
publishRequest.addMessages(outstandingPublish.message);
343312
}
344-
return publisherStub.publishCallable().futureCall(publishRequest.build());
345-
}
346313

347-
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
348-
final ApiFutureCallback<PublishResponse> futureCallback =
314+
ApiFutures.addCallback(
315+
publisherStub.publishCallable().futureCall(publishRequest.build()),
349316
new ApiFutureCallback<PublishResponse>() {
350317
@Override
351318
public void onSuccess(PublishResponse result) {
@@ -384,47 +351,21 @@ public void onFailure(Throwable t) {
384351
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
385352
}
386353
}
387-
};
388-
389-
if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) {
390-
// If ordering key is empty, publish the batch using the normal executor.
391-
Runnable task =
392-
new Runnable() {
393-
public void run() {
394-
ApiFutures.addCallback(
395-
publishCall(outstandingBatch), futureCallback, directExecutor());
396-
}
397-
};
398-
executor.execute(task);
399-
} else {
400-
// If ordering key is specified, publish the batch using the sequential executor.
401-
Callable<ApiFuture> func =
402-
new Callable<ApiFuture>() {
403-
public ApiFuture call() {
404-
return publishCall(outstandingBatch);
405-
}
406-
};
407-
ApiFutures.addCallback(
408-
sequentialExecutor.submit(outstandingBatch.orderingKey, func),
409-
futureCallback,
410-
directExecutor());
411-
}
354+
},
355+
directExecutor());
412356
}
413357

414358
private static final class OutstandingBatch {
415359
final List<OutstandingPublish> outstandingPublishes;
416360
final long creationTime;
417361
int attempt;
418362
int batchSizeBytes;
419-
final String orderingKey;
420363

421-
OutstandingBatch(
422-
List<OutstandingPublish> outstandingPublishes, int batchSizeBytes, String orderingKey) {
364+
OutstandingBatch(List<OutstandingPublish> outstandingPublishes, int batchSizeBytes) {
423365
this.outstandingPublishes = outstandingPublishes;
424366
attempt = 1;
425367
creationTime = System.currentTimeMillis();
426368
this.batchSizeBytes = batchSizeBytes;
427-
this.orderingKey = orderingKey;
428369
}
429370

430371
public int getAttempt() {
@@ -562,7 +503,7 @@ public static final class Builder {
562503
.setRpcTimeoutMultiplier(2)
563504
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
564505
.build();
565-
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
506+
566507
private static final int THREADS_PER_CPU = 5;
567508
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
568509
InstantiatingExecutorProvider.newBuilder()
@@ -576,8 +517,6 @@ public static final class Builder {
576517

577518
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
578519

579-
boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING;
580-
581520
TransportChannelProvider channelProvider =
582521
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
583522

@@ -672,12 +611,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
672611
return this;
673612
}
674613

675-
/** Sets the message ordering option. */
676-
public Builder setEnableMessageOrdering(boolean enableMessageOrdering) {
677-
this.enableMessageOrdering = enableMessageOrdering;
678-
return this;
679-
}
680-
681614
/** Gives the ability to set a custom executor to be used by the library. */
682615
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
683616
this.executorProvider = Preconditions.checkNotNull(executorProvider);
@@ -703,14 +636,9 @@ public Publisher build() throws IOException {
703636
private static class MessagesBatch {
704637
private List<OutstandingPublish> messages = new LinkedList<>();
705638
private int batchedBytes;
706-
private String orderingKey;
707-
708-
private MessagesBatch(String orderingKey) {
709-
this.orderingKey = orderingKey;
710-
}
711639

712640
private OutstandingBatch popOutstandingBatch() {
713-
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey);
641+
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
714642
reset();
715643
return batch;
716644
}

0 commit comments

Comments
 (0)