Skip to content

Commit 7cd9967

Browse files
authored
---
yaml --- r: 32755 b: refs/heads/autosynth-iot c: 61b9e4f h: refs/heads/master i: 32753: 340af83 32751: 740f58e
1 parent fdd7457 commit 7cd9967

2 files changed

Lines changed: 45 additions & 37 deletions

File tree

  • branches/autosynth-iot/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ refs/heads/autosynth-datastore: f1efc3dc465174f41041acd56cf29badcec3e5bd
131131
refs/heads/autosynth-dialogflow: 73974cc32e5212aec0126472e0bc1442886fedaf
132132
refs/heads/autosynth-errorreporting: effe8001d110ad584187b30aafc473db0dd4a15f
133133
refs/heads/autosynth-firestore: e79eeb26930dfae4439424ad2fda5874eeca54c8
134-
refs/heads/autosynth-iot: 59c609fc9dd9d865a1466969410e280b7bdf62dd
134+
refs/heads/autosynth-iot: 61b9e4fc725fda428fa6ec8ffe177cd86bf230a9
135135
refs/heads/autosynth-kms: 6b65b0f34c12d141031c7288cdc01e550212d0f6
136136
refs/heads/autosynth-language: e73905aa7672afa47240e65b25c087207f4594f9
137137
refs/heads/autosynth-os-login: 123ba209c5769d0ee067e0ce5848bec13b42a4f4

branches/autosynth-iot/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private Publisher(Builder builder) throws IOException {
116116
this.batchingSettings = builder.batchingSettings;
117117
this.messageTransform = builder.messageTransform;
118118

119-
messagesBatch = new MessagesBatch();
119+
messagesBatch = new MessagesBatch(batchingSettings);
120120
messagesBatchLock = new ReentrantLock();
121121
activeAlarm = new AtomicBoolean(false);
122122
executor = builder.executorProvider.getExecutor();
@@ -196,29 +196,12 @@ public ApiFuture<String> publish(PubsubMessage message) {
196196
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
197197
}
198198

199-
message = messageTransform.apply(message);
200-
List<OutstandingBatch> batchesToSend = new ArrayList<>();
201-
final OutstandingPublish outstandingPublish = new OutstandingPublish(message);
199+
final OutstandingPublish outstandingPublish =
200+
new OutstandingPublish(messageTransform.apply(message));
201+
List<OutstandingBatch> batchesToSend;
202202
messagesBatchLock.lock();
203203
try {
204-
// Check if the next message makes the current batch exceed the max batch byte size.
205-
if (!messagesBatch.isEmpty()
206-
&& hasBatchingBytes()
207-
&& messagesBatch.getBatchedBytes() + outstandingPublish.messageSize
208-
>= getMaxBatchBytes()) {
209-
batchesToSend.add(messagesBatch.popOutstandingBatch());
210-
}
211-
212-
messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);
213-
214-
// Border case: If the message to send is greater or equals to the max batch size then send it
215-
// immediately.
216-
// Alternatively if after adding the message we have reached the batch max messages then we
217-
// have a batch to send.
218-
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
219-
|| messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
220-
batchesToSend.add(messagesBatch.popOutstandingBatch());
221-
}
204+
batchesToSend = messagesBatch.add(outstandingPublish);
222205
// Setup the next duration based delivery alarm if there are messages batched.
223206
setupAlarm();
224207
} finally {
@@ -378,10 +361,6 @@ public BatchingSettings getBatchingSettings() {
378361
return batchingSettings;
379362
}
380363

381-
private long getMaxBatchBytes() {
382-
return getBatchingSettings().getRequestByteThreshold();
383-
}
384-
385364
/**
386365
* Schedules immediate publishing of any outstanding messages and waits until all are processed.
387366
*
@@ -410,10 +389,6 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
410389
return backgroundResources.awaitTermination(duration, unit);
411390
}
412391

413-
private boolean hasBatchingBytes() {
414-
return getMaxBatchBytes() > 0;
415-
}
416-
417392
/**
418393
* Constructs a new {@link Builder} using the given topic.
419394
*
@@ -616,8 +591,14 @@ public Publisher build() throws IOException {
616591
}
617592

618593
private static class MessagesBatch {
619-
private List<OutstandingPublish> messages = new LinkedList<>();
594+
private List<OutstandingPublish> messages;
620595
private int batchedBytes;
596+
private final BatchingSettings batchingSettings;
597+
598+
public MessagesBatch(BatchingSettings batchingSettings) {
599+
this.batchingSettings = batchingSettings;
600+
reset();
601+
}
621602

622603
private OutstandingBatch popOutstandingBatch() {
623604
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
@@ -638,13 +619,40 @@ private int getBatchedBytes() {
638619
return batchedBytes;
639620
}
640621

641-
private void addMessage(OutstandingPublish message, int messageSize) {
642-
messages.add(message);
643-
batchedBytes += messageSize;
644-
}
645-
646622
private int getMessagesCount() {
647623
return messages.size();
648624
}
625+
626+
private boolean hasBatchingBytes() {
627+
return getMaxBatchBytes() > 0;
628+
}
629+
630+
private long getMaxBatchBytes() {
631+
return batchingSettings.getRequestByteThreshold();
632+
}
633+
634+
private List<OutstandingBatch> add(OutstandingPublish outstandingPublish) {
635+
List<OutstandingBatch> batchesToSend = new ArrayList<>();
636+
// Check if the next message makes the current batch exceed the max batch byte size.
637+
if (!isEmpty()
638+
&& hasBatchingBytes()
639+
&& getBatchedBytes() + outstandingPublish.messageSize >= getMaxBatchBytes()) {
640+
batchesToSend.add(popOutstandingBatch());
641+
}
642+
643+
messages.add(outstandingPublish);
644+
batchedBytes += outstandingPublish.messageSize;
645+
646+
// Border case: If the message to send is greater or equals to the max batch size then send it
647+
// immediately.
648+
// Alternatively if after adding the message we have reached the batch max messages then we
649+
// have a batch to send.
650+
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
651+
|| getMessagesCount() == batchingSettings.getElementCountThreshold()) {
652+
batchesToSend.add(popOutstandingBatch());
653+
}
654+
655+
return batchesToSend;
656+
}
649657
}
650658
}

0 commit comments

Comments
 (0)