Skip to content

Commit e3548ae

Browse files
authored
---
yaml --- r: 31839 b: refs/heads/autosynth-datastore c: b1ebd2d h: refs/heads/master i: 31837: 08c42d7 31835: 83276c9 31831: 4ea6052 31823: 62a47bb 31807: 681f4b1
1 parent a96ae39 commit e3548ae

2 files changed

Lines changed: 42 additions & 17 deletions

File tree

  • branches/autosynth-datastore/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ refs/heads/autosynth-bigquerystorage: a345f72d7a6358e9fea9dcdae94ec85a27da3088
127127
refs/heads/autosynth-bigtable: 2a6af7e2959fd79c99b8ca5d773f29a6434b546d
128128
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca
129129
refs/heads/autosynth-containeranalysis: 18d210f81f17cf74864d0db2c29e834302f74f2a
130-
refs/heads/autosynth-datastore: 79b64df4705b7a47895ec02aae4e84e5b8107884
130+
refs/heads/autosynth-datastore: b1ebd2d84384315f5788d1fa9f48c0a60265de20
131131
refs/heads/autosynth-dialogflow: 841930a680ebe370c9fc6ae824465f6bc51a5a46
132132
refs/heads/autosynth-errorreporting: 3f176c20b55dfaaa8fc32f28d82b31784b93e636
133133
refs/heads/autosynth-firestore: 983c75e4fb1076502c8cac73ef0538bdb10884f3

branches/autosynth-datastore/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ public class Publisher {
8686
private final BatchingSettings batchingSettings;
8787

8888
private final Lock messagesBatchLock;
89-
private List<OutstandingPublish> messagesBatch;
90-
private int batchedBytes;
89+
private MessagesBatch messagesBatch;
9190

9291
private final AtomicBoolean activeAlarm;
9392

@@ -116,7 +115,7 @@ private Publisher(Builder builder) throws IOException {
116115
this.batchingSettings = builder.batchingSettings;
117116
this.messageTransform = builder.messageTransform;
118117

119-
messagesBatch = new LinkedList<>();
118+
messagesBatch = new MessagesBatch();
120119
messagesBatchLock = new ReentrantLock();
121120
activeAlarm = new AtomicBoolean(false);
122121
executor = builder.executorProvider.getExecutor();
@@ -207,24 +206,19 @@ public ApiFuture<String> publish(PubsubMessage message) {
207206
// Check if the next message makes the current batch exceed the max batch byte size.
208207
if (!messagesBatch.isEmpty()
209208
&& hasBatchingBytes()
210-
&& batchedBytes + messageSize >= getMaxBatchBytes()) {
211-
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
212-
messagesBatch = new LinkedList<>();
213-
batchedBytes = 0;
209+
&& messagesBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
210+
batchToSend = messagesBatch.popOutstandingBatch();
214211
}
215212

216213
// Border case if the message to send is greater or equals to the max batch size then can't
217214
// be included in the current batch and instead sent immediately.
218215
if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
219-
batchedBytes += messageSize;
220-
messagesBatch.add(outstandingPublish);
216+
messagesBatch.addMessage(outstandingPublish, messageSize);
221217

222218
// If after adding the message we have reached the batch max messages then we have a batch
223219
// to send.
224-
if (messagesBatch.size() == getBatchingSettings().getElementCountThreshold()) {
225-
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
226-
messagesBatch = new LinkedList<>();
227-
batchedBytes = 0;
220+
if (messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
221+
batchToSend = messagesBatch.popOutstandingBatch();
228222
}
229223
}
230224
// Setup the next duration based delivery alarm if there are messages batched.
@@ -303,9 +297,7 @@ public void publishAllOutstanding() {
303297
if (messagesBatch.isEmpty()) {
304298
return;
305299
}
306-
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
307-
messagesBatch = new LinkedList<>();
308-
batchedBytes = 0;
300+
batchToSend = messagesBatch.popOutstandingBatch();
309301
} finally {
310302
messagesBatchLock.unlock();
311303
}
@@ -640,4 +632,37 @@ public Publisher build() throws IOException {
640632
return new Publisher(this);
641633
}
642634
}
635+
636+
private static class MessagesBatch {
637+
private List<OutstandingPublish> messages = new LinkedList<>();
638+
private int batchedBytes;
639+
640+
private OutstandingBatch popOutstandingBatch() {
641+
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
642+
reset();
643+
return batch;
644+
}
645+
646+
private void reset() {
647+
messages = new LinkedList<>();
648+
batchedBytes = 0;
649+
}
650+
651+
private boolean isEmpty() {
652+
return messages.isEmpty();
653+
}
654+
655+
private int getBatchedBytes() {
656+
return batchedBytes;
657+
}
658+
659+
private void addMessage(OutstandingPublish message, int messageSize) {
660+
messages.add(message);
661+
batchedBytes += messageSize;
662+
}
663+
664+
private int getMessagesCount() {
665+
return messages.size();
666+
}
667+
}
643668
}

0 commit comments

Comments
 (0)