Skip to content

Commit bfe6124

Browse files
kimkyung-googkolea2
authored andcommitted
---
yaml --- r: 20657 b: refs/heads/pubsub-ordering-keys c: 858d4e9 h: refs/heads/master i: 20655: 7eca050
1 parent 53cce6e commit bfe6124

3 files changed

Lines changed: 36 additions & 3 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
157157
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
158158
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
159159
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
160-
refs/heads/pubsub-ordering-keys: fab0188fc02390be1c7a12051b37e722e82fcd27
160+
refs/heads/pubsub-ordering-keys: 858d4e986a0ba48e08f00d42f51cbdecb175f5d6
161161
"refs/heads/update_mvn_badge": ae2d773814db0f71197ccf5a8612ee1d8056f8de
162162
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
163163
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,18 +269,18 @@ && hasBatchingBytes()
269269

270270
if (batchToSend != null) {
271271
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
272-
publishOutstandingBatch(batchToSend);
273272
publishAllOutstanding();
273+
publishOutstandingBatch(batchToSend);
274274
}
275275

276276
// If the message is over the size limit, it was not added to the pending messages and it will
277277
// be sent in its own batch immediately.
278278
if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) {
279279
logger.log(
280280
Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send.");
281+
publishAllOutstanding();
281282
publishOutstandingBatch(
282283
new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize, orderingKey));
283-
publishAllOutstanding();
284284
}
285285

286286
return publishResult;

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,39 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception {
338338
publisher.shutdown();
339339
}
340340

341+
@Test
342+
public void testLargeMessagesDoNotReorderBatches() throws Exception {
343+
// Set the maximum batching size to 20 bytes.
344+
Publisher publisher =
345+
getTestPublisherBuilder()
346+
.setBatchingSettings(
347+
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
348+
.toBuilder()
349+
.setElementCountThreshold(10L)
350+
.setRequestByteThreshold(20L)
351+
.setDelayThreshold(Duration.ofSeconds(100))
352+
.build())
353+
.setEnableMessageOrdering(true)
354+
.build();
355+
testPublisherServiceImpl.setAutoPublishResponse(true);
356+
ApiFuture<String> publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA");
357+
ApiFuture<String> publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB");
358+
359+
assertFalse(publishFuture1.isDone());
360+
assertFalse(publishFuture2.isDone());
361+
362+
ApiFuture<String> publishFuture3 =
363+
sendTestMessageWithOrderingKey(publisher, "VeryLargeMessage", "OrderB");
364+
// Verify that messages with "OrderB" were delivered in order.
365+
assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture3.get()));
366+
367+
assertTrue(publishFuture1.isDone());
368+
assertTrue(publishFuture2.isDone());
369+
assertTrue(publishFuture3.isDone());
370+
371+
publisher.shutdown();
372+
}
373+
341374
@Test
342375
public void testOrderingKeyWhenDisabled_throwsException() throws Exception {
343376
// Message ordering is disabled by default.

0 commit comments

Comments
 (0)