Skip to content

Commit d761553

Browse files
authored
---
yaml --- r: 30715 b: refs/heads/autosynth-bigquerydatatransfer c: 909b0e8 h: refs/heads/master i: 30713: 5858c06 30711: 55b6038
1 parent 03cfd7a commit d761553

2 files changed

Lines changed: 16 additions & 14 deletions

File tree

  • branches/autosynth-bigquerydatatransfer/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ refs/tags/v0.68.0: 9cc799fcf68c82ab431d425fefa58ef615ce8e5b
122122
refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea
123123
refs/heads/autosynth-asset: bdb45634a0fe8f7a510692b56b31f5312e25f453
124124
refs/heads/autosynth-automl: 22f9dd5b6f5df8dbfa7da0126864d565229519b2
125-
refs/heads/autosynth-bigquerydatatransfer: e67fd018a79de01181f1806a4597055d5d2104bb
125+
refs/heads/autosynth-bigquerydatatransfer: 909b0e8ef2d87a5d75da3ebfb595868a94645aae
126126
refs/heads/autosynth-bigquerystorage: d2c53da3b012e38c662e4df0738042435f19365f
127127
refs/heads/autosynth-bigtable: 9e5429f45cf9face9fed585d0233534993e36b58
128128
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca

branches/autosynth-bigquerydatatransfer/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -197,23 +197,22 @@ public ApiFuture<String> publish(PubsubMessage message) {
197197
}
198198

199199
message = messageTransform.apply(message);
200-
final int messageSize = message.getSerializedSize();
201200
OutstandingBatch batchToSend = null;
202-
SettableApiFuture<String> publishResult = SettableApiFuture.<String>create();
203-
final OutstandingPublish outstandingPublish = new OutstandingPublish(publishResult, message);
201+
final OutstandingPublish outstandingPublish = new OutstandingPublish(message);
204202
messagesBatchLock.lock();
205203
try {
206204
// Check if the next message makes the current batch exceed the max batch byte size.
207205
if (!messagesBatch.isEmpty()
208206
&& hasBatchingBytes()
209-
&& messagesBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
207+
&& messagesBatch.getBatchedBytes() + outstandingPublish.messageSize
208+
>= getMaxBatchBytes()) {
210209
batchToSend = messagesBatch.popOutstandingBatch();
211210
}
212211

213212
// Border case if the message to send is greater or equals to the max batch size then can't
214213
// be included in the current batch and instead sent immediately.
215-
if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
216-
messagesBatch.addMessage(outstandingPublish, messageSize);
214+
if (!hasBatchingBytes() || outstandingPublish.messageSize < getMaxBatchBytes()) {
215+
messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);
217216

218217
// If after adding the message we have reached the batch max messages then we have a batch
219218
// to send.
@@ -243,20 +242,21 @@ public void run() {
243242

244243
// If the message is over the size limit, it was not added to the pending messages and it will
245244
// be sent in its own batch immediately.
246-
if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) {
245+
if (hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) {
247246
logger.log(
248247
Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send.");
249248
executor.execute(
250249
new Runnable() {
251250
@Override
252251
public void run() {
253252
publishOutstandingBatch(
254-
new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize));
253+
new OutstandingBatch(
254+
ImmutableList.of(outstandingPublish), outstandingPublish.messageSize));
255255
}
256256
});
257257
}
258258

259-
return publishResult;
259+
return outstandingPublish.publishResult;
260260
}
261261

262262
private void setupAlarm() {
@@ -382,12 +382,14 @@ public int size() {
382382
}
383383

384384
private static final class OutstandingPublish {
385-
SettableApiFuture<String> publishResult;
386-
PubsubMessage message;
385+
final SettableApiFuture<String> publishResult;
386+
final PubsubMessage message;
387+
final int messageSize;
387388

388-
OutstandingPublish(SettableApiFuture<String> publishResult, PubsubMessage message) {
389-
this.publishResult = publishResult;
389+
OutstandingPublish(PubsubMessage message) {
390+
this.publishResult = SettableApiFuture.create();
390391
this.message = message;
392+
this.messageSize = message.getSerializedSize();
391393
}
392394
}
393395

0 commit comments

Comments
 (0)