Skip to content

Commit 86bbaea

Browse files
authored
---
yaml --- r: 30719 b: refs/heads/autosynth-bigquerydatatransfer c: abf92a3 h: refs/heads/master i: 30717: 1b01112 30715: d761553 30711: 55b6038 30703: 97365c2 30687: e8773e7 30655: 768abac 30591: c46245d 30463: 61d5885 30207: 405f7b5 29695: 0bbc4da 28671: 0ab66f1
1 parent 564a2b2 commit 86bbaea

2 files changed

Lines changed: 23 additions & 39 deletions

File tree

  • branches/autosynth-bigquerydatatransfer/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ refs/tags/v0.68.0: 9cc799fcf68c82ab431d425fefa58ef615ce8e5b
122122
refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea
123123
refs/heads/autosynth-asset: bdb45634a0fe8f7a510692b56b31f5312e25f453
124124
refs/heads/autosynth-automl: 22f9dd5b6f5df8dbfa7da0126864d565229519b2
125-
refs/heads/autosynth-bigquerydatatransfer: 264643df7bcc715efbd648c3ab40389ae2253d19
125+
refs/heads/autosynth-bigquerydatatransfer: abf92a3ea30a7375790f6447532f6fd45bed5744
126126
refs/heads/autosynth-bigquerystorage: d2c53da3b012e38c662e4df0738042435f19365f
127127
refs/heads/autosynth-bigtable: 9e5429f45cf9face9fed585d0233534993e36b58
128128
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca

branches/autosynth-bigquerydatatransfer/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@
4040
import com.google.cloud.pubsub.v1.stub.PublisherStub;
4141
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
4242
import com.google.common.base.Preconditions;
43-
import com.google.common.collect.ImmutableList;
4443
import com.google.pubsub.v1.PublishRequest;
4544
import com.google.pubsub.v1.PublishResponse;
4645
import com.google.pubsub.v1.PubsubMessage;
4746
import com.google.pubsub.v1.TopicName;
4847
import com.google.pubsub.v1.TopicNames;
4948
import java.io.IOException;
49+
import java.util.ArrayList;
5050
import java.util.Collections;
5151
import java.util.Iterator;
5252
import java.util.LinkedList;
@@ -197,7 +197,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
197197
}
198198

199199
message = messageTransform.apply(message);
200-
OutstandingBatch batchToSend = null;
200+
List<OutstandingBatch> batchesToSend = new ArrayList<>();
201201
final OutstandingPublish outstandingPublish = new OutstandingPublish(message);
202202
messagesBatchLock.lock();
203203
try {
@@ -206,19 +206,18 @@ public ApiFuture<String> publish(PubsubMessage message) {
206206
&& hasBatchingBytes()
207207
&& messagesBatch.getBatchedBytes() + outstandingPublish.messageSize
208208
>= getMaxBatchBytes()) {
209-
batchToSend = messagesBatch.popOutstandingBatch();
209+
batchesToSend.add(messagesBatch.popOutstandingBatch());
210210
}
211211

212-
// Border case if the message to send is greater or equals to the max batch size then can't
213-
// be included in the current batch and instead sent immediately.
214-
if (!hasBatchingBytes() || outstandingPublish.messageSize < getMaxBatchBytes()) {
215-
messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);
212+
messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);
216213

217-
// If after adding the message we have reached the batch max messages then we have a batch
218-
// to send.
219-
if (messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
220-
batchToSend = messagesBatch.popOutstandingBatch();
221-
}
214+
// Border case: If the message to send is greater or equals to the max batch size then send it
215+
// immediately.
216+
// Alternatively if after adding the message we have reached the batch max messages then we
217+
// have a batch to send.
218+
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
219+
|| messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
220+
batchesToSend.add(messagesBatch.popOutstandingBatch());
222221
}
223222
// Setup the next duration based delivery alarm if there are messages batched.
224223
setupAlarm();
@@ -228,32 +227,17 @@ && hasBatchingBytes()
228227

229228
messagesWaiter.incrementPendingMessages(1);
230229

231-
if (batchToSend != null) {
232-
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
233-
final OutstandingBatch finalBatchToSend = batchToSend;
234-
executor.execute(
235-
new Runnable() {
236-
@Override
237-
public void run() {
238-
publishOutstandingBatch(finalBatchToSend);
239-
}
240-
});
241-
}
242-
243-
// If the message is over the size limit, it was not added to the pending messages and it will
244-
// be sent in its own batch immediately.
245-
if (hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) {
246-
logger.log(
247-
Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send.");
248-
executor.execute(
249-
new Runnable() {
250-
@Override
251-
public void run() {
252-
publishOutstandingBatch(
253-
new OutstandingBatch(
254-
ImmutableList.of(outstandingPublish), outstandingPublish.messageSize));
255-
}
256-
});
230+
if (!batchesToSend.isEmpty()) {
231+
for (final OutstandingBatch batch : batchesToSend) {
232+
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
233+
executor.execute(
234+
new Runnable() {
235+
@Override
236+
public void run() {
237+
publishOutstandingBatch(batch);
238+
}
239+
});
240+
}
257241
}
258242

259243
return outstandingPublish.publishResult;

0 commit comments

Comments
 (0)