@@ -197,23 +197,22 @@ public ApiFuture<String> publish(PubsubMessage message) {
197197 }
198198
199199 message = messageTransform .apply (message );
200- final int messageSize = message .getSerializedSize ();
201200 OutstandingBatch batchToSend = null ;
202- SettableApiFuture <String > publishResult = SettableApiFuture .<String >create ();
203- final OutstandingPublish outstandingPublish = new OutstandingPublish (publishResult , message );
201+ final OutstandingPublish outstandingPublish = new OutstandingPublish (message );
204202 messagesBatchLock .lock ();
205203 try {
206204 // Check if the next message makes the current batch exceed the max batch byte size.
207205 if (!messagesBatch .isEmpty ()
208206 && hasBatchingBytes ()
209- && messagesBatch .getBatchedBytes () + messageSize >= getMaxBatchBytes ()) {
207+ && messagesBatch .getBatchedBytes () + outstandingPublish .messageSize
208+ >= getMaxBatchBytes ()) {
210209 batchToSend = messagesBatch .popOutstandingBatch ();
211210 }
212211
213212 // Border case if the message to send is greater or equals to the max batch size then can't
214213 // be included in the current batch and instead sent immediately.
215- if (!hasBatchingBytes () || messageSize < getMaxBatchBytes ()) {
216- messagesBatch .addMessage (outstandingPublish , messageSize );
214+ if (!hasBatchingBytes () || outstandingPublish . messageSize < getMaxBatchBytes ()) {
215+ messagesBatch .addMessage (outstandingPublish , outstandingPublish . messageSize );
217216
218217 // If after adding the message we have reached the batch max messages then we have a batch
219218 // to send.
@@ -243,20 +242,21 @@ public void run() {
243242
244243 // If the message is over the size limit, it was not added to the pending messages and it will
245244 // be sent in its own batch immediately.
246- if (hasBatchingBytes () && messageSize >= getMaxBatchBytes ()) {
245+ if (hasBatchingBytes () && outstandingPublish . messageSize >= getMaxBatchBytes ()) {
247246 logger .log (
248247 Level .FINER , "Message exceeds the max batch bytes, scheduling it for immediate send." );
249248 executor .execute (
250249 new Runnable () {
251250 @ Override
252251 public void run () {
253252 publishOutstandingBatch (
254- new OutstandingBatch (ImmutableList .of (outstandingPublish ), messageSize ));
253+ new OutstandingBatch (
254+ ImmutableList .of (outstandingPublish ), outstandingPublish .messageSize ));
255255 }
256256 });
257257 }
258258
259- return publishResult ;
259+ return outstandingPublish . publishResult ;
260260 }
261261
262262 private void setupAlarm () {
@@ -382,12 +382,14 @@ public int size() {
382382 }
383383
384384 private static final class OutstandingPublish {
385- SettableApiFuture <String > publishResult ;
386- PubsubMessage message ;
385+ final SettableApiFuture <String > publishResult ;
386+ final PubsubMessage message ;
387+ final int messageSize ;
387388
388- OutstandingPublish (SettableApiFuture < String > publishResult , PubsubMessage message ) {
389- this .publishResult = publishResult ;
389+ OutstandingPublish (PubsubMessage message ) {
390+ this .publishResult = SettableApiFuture . create () ;
390391 this .message = message ;
392+ this .messageSize = message .getSerializedSize ();
391393 }
392394 }
393395
0 commit comments