4040import com .google .cloud .pubsub .v1 .stub .PublisherStub ;
4141import com .google .cloud .pubsub .v1 .stub .PublisherStubSettings ;
4242import com .google .common .base .Preconditions ;
43- import com .google .common .collect .ImmutableList ;
4443import com .google .pubsub .v1 .PublishRequest ;
4544import com .google .pubsub .v1 .PublishResponse ;
4645import com .google .pubsub .v1 .PubsubMessage ;
4746import com .google .pubsub .v1 .TopicName ;
4847import com .google .pubsub .v1 .TopicNames ;
4948import java .io .IOException ;
49+ import java .util .ArrayList ;
5050import java .util .Collections ;
5151import java .util .Iterator ;
5252import java .util .LinkedList ;
@@ -197,7 +197,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
197197 }
198198
199199 message = messageTransform .apply (message );
200- OutstandingBatch batchToSend = null ;
200+ List < OutstandingBatch > batchesToSend = new ArrayList <>() ;
201201 final OutstandingPublish outstandingPublish = new OutstandingPublish (message );
202202 messagesBatchLock .lock ();
203203 try {
@@ -206,19 +206,18 @@ public ApiFuture<String> publish(PubsubMessage message) {
206206 && hasBatchingBytes ()
207207 && messagesBatch .getBatchedBytes () + outstandingPublish .messageSize
208208 >= getMaxBatchBytes ()) {
209- batchToSend = messagesBatch .popOutstandingBatch ();
209+ batchesToSend . add ( messagesBatch .popOutstandingBatch () );
210210 }
211211
212- // Border case if the message to send is greater or equals to the max batch size then can't
213- // be included in the current batch and instead sent immediately.
214- if (!hasBatchingBytes () || outstandingPublish .messageSize < getMaxBatchBytes ()) {
215- messagesBatch .addMessage (outstandingPublish , outstandingPublish .messageSize );
212+ messagesBatch .addMessage (outstandingPublish , outstandingPublish .messageSize );
216213
217- // If after adding the message we have reached the batch max messages then we have a batch
218- // to send.
219- if (messagesBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
220- batchToSend = messagesBatch .popOutstandingBatch ();
221- }
214+ // Border case: If the message to send is greater or equals to the max batch size then send it
215+ // immediately.
216+ // Alternatively if after adding the message we have reached the batch max messages then we
217+ // have a batch to send.
218+ if ((hasBatchingBytes () && outstandingPublish .messageSize >= getMaxBatchBytes ())
219+ || messagesBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
220+ batchesToSend .add (messagesBatch .popOutstandingBatch ());
222221 }
223222 // Setup the next duration based delivery alarm if there are messages batched.
224223 setupAlarm ();
@@ -228,32 +227,17 @@ && hasBatchingBytes()
228227
229228 messagesWaiter .incrementPendingMessages (1 );
230229
231- if (batchToSend != null ) {
232- logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
233- final OutstandingBatch finalBatchToSend = batchToSend ;
234- executor .execute (
235- new Runnable () {
236- @ Override
237- public void run () {
238- publishOutstandingBatch (finalBatchToSend );
239- }
240- });
241- }
242-
243- // If the message is over the size limit, it was not added to the pending messages and it will
244- // be sent in its own batch immediately.
245- if (hasBatchingBytes () && outstandingPublish .messageSize >= getMaxBatchBytes ()) {
246- logger .log (
247- Level .FINER , "Message exceeds the max batch bytes, scheduling it for immediate send." );
248- executor .execute (
249- new Runnable () {
250- @ Override
251- public void run () {
252- publishOutstandingBatch (
253- new OutstandingBatch (
254- ImmutableList .of (outstandingPublish ), outstandingPublish .messageSize ));
255- }
256- });
230+ if (!batchesToSend .isEmpty ()) {
231+ for (final OutstandingBatch batch : batchesToSend ) {
232+ logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
233+ executor .execute (
234+ new Runnable () {
235+ @ Override
236+ public void run () {
237+ publishOutstandingBatch (batch );
238+ }
239+ });
240+ }
257241 }
258242
259243 return outstandingPublish .publishResult ;
0 commit comments