@@ -116,7 +116,7 @@ private Publisher(Builder builder) throws IOException {
116116 this .batchingSettings = builder .batchingSettings ;
117117 this .messageTransform = builder .messageTransform ;
118118
119- messagesBatch = new MessagesBatch ();
119+ messagesBatch = new MessagesBatch (batchingSettings );
120120 messagesBatchLock = new ReentrantLock ();
121121 activeAlarm = new AtomicBoolean (false );
122122 executor = builder .executorProvider .getExecutor ();
@@ -196,29 +196,12 @@ public ApiFuture<String> publish(PubsubMessage message) {
196196 throw new IllegalStateException ("Cannot publish on a shut-down publisher." );
197197 }
198198
199- message = messageTransform . apply ( message );
200- List < OutstandingBatch > batchesToSend = new ArrayList <>( );
201- final OutstandingPublish outstandingPublish = new OutstandingPublish ( message ) ;
199+ final OutstandingPublish outstandingPublish =
200+ new OutstandingPublish ( messageTransform . apply ( message ) );
201+ List < OutstandingBatch > batchesToSend ;
202202 messagesBatchLock .lock ();
203203 try {
204- // Check if the next message makes the current batch exceed the max batch byte size.
205- if (!messagesBatch .isEmpty ()
206- && hasBatchingBytes ()
207- && messagesBatch .getBatchedBytes () + outstandingPublish .messageSize
208- >= getMaxBatchBytes ()) {
209- batchesToSend .add (messagesBatch .popOutstandingBatch ());
210- }
211-
212- messagesBatch .addMessage (outstandingPublish , outstandingPublish .messageSize );
213-
214- // Border case: If the message to send is greater or equals to the max batch size then send it
215- // immediately.
216- // Alternatively if after adding the message we have reached the batch max messages then we
217- // have a batch to send.
218- if ((hasBatchingBytes () && outstandingPublish .messageSize >= getMaxBatchBytes ())
219- || messagesBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
220- batchesToSend .add (messagesBatch .popOutstandingBatch ());
221- }
204+ batchesToSend = messagesBatch .add (outstandingPublish );
222205 // Setup the next duration based delivery alarm if there are messages batched.
223206 setupAlarm ();
224207 } finally {
@@ -378,10 +361,6 @@ public BatchingSettings getBatchingSettings() {
378361 return batchingSettings ;
379362 }
380363
381- private long getMaxBatchBytes () {
382- return getBatchingSettings ().getRequestByteThreshold ();
383- }
384-
385364 /**
386365 * Schedules immediate publishing of any outstanding messages and waits until all are processed.
387366 *
@@ -410,10 +389,6 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
410389 return backgroundResources .awaitTermination (duration , unit );
411390 }
412391
413- private boolean hasBatchingBytes () {
414- return getMaxBatchBytes () > 0 ;
415- }
416-
417392 /**
418393 * Constructs a new {@link Builder} using the given topic.
419394 *
@@ -616,8 +591,14 @@ public Publisher build() throws IOException {
616591 }
617592
618593 private static class MessagesBatch {
619- private List <OutstandingPublish > messages = new LinkedList <>() ;
594+ private List <OutstandingPublish > messages ;
620595 private int batchedBytes ;
596+ private final BatchingSettings batchingSettings ;
597+
598+ public MessagesBatch (BatchingSettings batchingSettings ) {
599+ this .batchingSettings = batchingSettings ;
600+ reset ();
601+ }
621602
622603 private OutstandingBatch popOutstandingBatch () {
623604 OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
@@ -638,13 +619,40 @@ private int getBatchedBytes() {
638619 return batchedBytes ;
639620 }
640621
641- private void addMessage (OutstandingPublish message , int messageSize ) {
642- messages .add (message );
643- batchedBytes += messageSize ;
644- }
645-
646622 private int getMessagesCount () {
647623 return messages .size ();
648624 }
625+
626+ private boolean hasBatchingBytes () {
627+ return getMaxBatchBytes () > 0 ;
628+ }
629+
630+ private long getMaxBatchBytes () {
631+ return batchingSettings .getRequestByteThreshold ();
632+ }
633+
634+ private List <OutstandingBatch > add (OutstandingPublish outstandingPublish ) {
635+ List <OutstandingBatch > batchesToSend = new ArrayList <>();
636+ // Check if the next message makes the current batch exceed the max batch byte size.
637+ if (!isEmpty ()
638+ && hasBatchingBytes ()
639+ && getBatchedBytes () + outstandingPublish .messageSize >= getMaxBatchBytes ()) {
640+ batchesToSend .add (popOutstandingBatch ());
641+ }
642+
643+ messages .add (outstandingPublish );
644+ batchedBytes += outstandingPublish .messageSize ;
645+
646+ // Border case: If the message to send is greater or equals to the max batch size then send it
647+ // immediately.
648+ // Alternatively if after adding the message we have reached the batch max messages then we
649+ // have a batch to send.
650+ if ((hasBatchingBytes () && outstandingPublish .messageSize >= getMaxBatchBytes ())
651+ || getMessagesCount () == batchingSettings .getElementCountThreshold ()) {
652+ batchesToSend .add (popOutstandingBatch ());
653+ }
654+
655+ return batchesToSend ;
656+ }
649657 }
650658}
0 commit comments