@@ -197,23 +197,22 @@ public ApiFuture<String> publish(PubsubMessage message) {
197197 }
198198
199199 message = messageTransform.apply(message);
200- final int messageSize = message.getSerializedSize();
201200 OutstandingBatch batchToSend = null;
202- SettableApiFuture<String> publishResult = SettableApiFuture.<String>create();
203- final OutstandingPublish outstandingPublish = new OutstandingPublish(publishResult, message);
201+ final OutstandingPublish outstandingPublish = new OutstandingPublish(message);
204202 messagesBatchLock.lock();
205203 try {
206204 // Check if the next message makes the current batch exceed the max batch byte size.
207205 if (!messagesBatch.isEmpty()
208206 && hasBatchingBytes()
209- && messagesBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
207+ && messagesBatch.getBatchedBytes() + outstandingPublish.messageSize
208+ >= getMaxBatchBytes()) {
210209 batchToSend = messagesBatch.popOutstandingBatch();
211210 }
212211
213212 // Border case if the message to send is greater or equals to the max batch size then can't
214213 // be included in the current batch and instead sent immediately.
215- if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
216- messagesBatch.addMessage(outstandingPublish, messageSize);
214+ if (!hasBatchingBytes() || outstandingPublish. messageSize < getMaxBatchBytes()) {
215+ messagesBatch.addMessage(outstandingPublish, outstandingPublish. messageSize);
217216
218217 // If after adding the message we have reached the batch max messages then we have a batch
219218 // to send.
@@ -243,20 +242,21 @@ public void run() {
243242
244243 // If the message is over the size limit, it was not added to the pending messages and it will
245244 // be sent in its own batch immediately.
246- if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) {
245+ if (hasBatchingBytes() && outstandingPublish. messageSize >= getMaxBatchBytes()) {
247246 logger.log(
248247 Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send.");
249248 executor.execute(
250249 new Runnable() {
251250 @Override
252251 public void run() {
253252 publishOutstandingBatch(
254- new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize));
253+ new OutstandingBatch(
254+ ImmutableList.of(outstandingPublish), outstandingPublish.messageSize));
255255 }
256256 });
257257 }
258258
259- return publishResult;
259+ return outstandingPublish. publishResult;
260260 }
261261
262262 private void setupAlarm() {
@@ -382,12 +382,14 @@ public int size() {
382382 }
383383
384384 private static final class OutstandingPublish {
385- SettableApiFuture<String> publishResult;
386- PubsubMessage message;
385+ final SettableApiFuture<String> publishResult;
386+ final PubsubMessage message;
387+ final int messageSize;
387388
388- OutstandingPublish(SettableApiFuture<String> publishResult, PubsubMessage message) {
389- this.publishResult = publishResult ;
389+ OutstandingPublish(PubsubMessage message) {
390+ this.publishResult = SettableApiFuture.create() ;
390391 this.message = message;
392+ this.messageSize = message.getSerializedSize();
391393 }
392394 }
393395
0 commit comments