3232import java .util .ArrayList ;
3333import java .util .Collections ;
3434import java .util .Deque ;
35- import java .util .HashSet ;
3635import java .util .LinkedList ;
3736import java .util .List ;
3837import java .util .PriorityQueue ;
39- import java .util .Set ;
38+ import java .util .concurrent . LinkedBlockingQueue ;
4039import java .util .concurrent .ScheduledExecutorService ;
4140import java .util .concurrent .ScheduledFuture ;
4241import java .util .concurrent .TimeUnit ;
@@ -73,8 +72,9 @@ class MessageDispatcher {
7372 private final MessageWaiter messagesWaiter ;
7473
7574 private final PriorityQueue <ExtensionJob > outstandingAckHandlers ;
76- private final Set <String > pendingAcks ;
77- private final Set <String > pendingNacks ;
75+ private final LinkedBlockingQueue <String > pendingAcks = new LinkedBlockingQueue <>();
76+ private final LinkedBlockingQueue <String > pendingNacks = new LinkedBlockingQueue <>();
77+ private final LinkedBlockingQueue <String > pendingReceipts = new LinkedBlockingQueue <>();
7878
7979 private final Lock alarmsLock ;
8080 // The deadline should be set by the subscriber connection before use,
@@ -185,47 +185,41 @@ private class AckHandler implements FutureCallback<AckReply> {
185185 receivedTimeMillis = clock .millisTime ();
186186 }
187187
188+ private void onBoth (LinkedBlockingQueue <String > destination ) {
189+ acked .getAndSet (true );
190+ destination .add (ackId );
191+ flowController .release (1 , outstandingBytes );
192+ messagesWaiter .incrementPendingMessages (-1 );
193+ processOutstandingBatches ();
194+ }
195+
188196 @ Override
189197 public void onFailure (Throwable t ) {
190198 logger .log (
191199 Level .WARNING ,
192200 "MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked." ,
193201 t );
194- acked .getAndSet (true );
195- synchronized (pendingNacks ) {
196- pendingNacks .add (ackId );
197- }
198- setupPendingAcksAlarm ();
199- flowController .release (1 , outstandingBytes );
200- messagesWaiter .incrementPendingMessages (-1 );
201- processOutstandingBatches ();
202+ onBoth (pendingNacks );
202203 }
203204
204205 @ Override
205206 public void onSuccess (AckReply reply ) {
206- acked . getAndSet ( true ) ;
207+ LinkedBlockingQueue < String > destination ;
207208 switch (reply ) {
208209 case ACK :
209- synchronized (pendingAcks ) {
210- pendingAcks .add (ackId );
211- }
210+ destination = pendingAcks ;
212211 // Record the latency rounded to the next closest integer.
213212 ackLatencyDistribution .record (
214213 Ints .saturatedCast (
215214 (long ) Math .ceil ((clock .millisTime () - receivedTimeMillis ) / 1000D )));
216215 break ;
217216 case NACK :
218- synchronized (pendingNacks ) {
219- pendingNacks .add (ackId );
220- }
217+ destination = pendingNacks ;
221218 break ;
222219 default :
223220 throw new IllegalArgumentException (String .format ("AckReply: %s not supported" , reply ));
224221 }
225- setupPendingAcksAlarm ();
226- flowController .release (1 , outstandingBytes );
227- messagesWaiter .incrementPendingMessages (-1 );
228- processOutstandingBatches ();
222+ onBoth (destination );
229223 }
230224 }
231225
@@ -254,8 +248,6 @@ void sendAckOperations(
254248 this .flowController = flowController ;
255249 this .outstandingMessageBatches = outstandingMessageBatches ;
256250 outstandingAckHandlers = new PriorityQueue <>();
257- pendingAcks = new HashSet <>();
258- pendingNacks = new HashSet <>();
259251 // 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
260252 this .ackLatencyDistribution = ackLatencyDistribution ;
261253 alarmsLock = new ReentrantLock ();
@@ -264,6 +256,25 @@ void sendAckOperations(
264256 this .clock = clock ;
265257 }
266258
259+ public void start () {
260+ pendingAcksAlarm =
261+ systemExecutor .scheduleWithFixedDelay (
262+ new Runnable () {
263+ @ Override
264+ public void run () {
265+ try {
266+ processOutstandingAckOperations ();
267+ } catch (Throwable t ) {
268+ // Catch everything so that one run failing doesn't prevent subsequent runs.
269+ logger .log (Level .WARNING , "failed to send acks/nacks" , t );
270+ }
271+ }
272+ },
273+ PENDING_ACKS_SEND_DELAY .toMillis (),
274+ PENDING_ACKS_SEND_DELAY .toMillis (),
275+ TimeUnit .MILLISECONDS );
276+ }
277+
267278 public void stop () {
268279 messagesWaiter .waitNoMessages ();
269280 alarmsLock .lock ();
@@ -272,6 +283,10 @@ public void stop() {
272283 ackDeadlineExtensionAlarm .cancel (true );
273284 ackDeadlineExtensionAlarm = null ;
274285 }
286+ if (pendingAcksAlarm != null ) {
287+ pendingAcksAlarm .cancel (false );
288+ pendingAcksAlarm = null ;
289+ }
275290 } finally {
276291 alarmsLock .unlock ();
277292 }
@@ -328,6 +343,9 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
328343 return ;
329344 }
330345 messagesWaiter .incrementPendingMessages (messages .size ());
346+ for (ReceivedMessage message : messages ) {
347+ pendingReceipts .add (message .getAckId ());
348+ }
331349
332350 OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch (doneCallback );
333351 final ArrayList <AckHandler > ackHandlers = new ArrayList <>(messages .size ());
@@ -421,32 +439,6 @@ public void run() {
421439 }
422440 }
423441
424- private void setupPendingAcksAlarm () {
425- alarmsLock .lock ();
426- try {
427- if (pendingAcksAlarm == null ) {
428- pendingAcksAlarm =
429- systemExecutor .schedule (
430- new Runnable () {
431- @ Override
432- public void run () {
433- alarmsLock .lock ();
434- try {
435- pendingAcksAlarm = null ;
436- } finally {
437- alarmsLock .unlock ();
438- }
439- processOutstandingAckOperations ();
440- }
441- },
442- PENDING_ACKS_SEND_DELAY .toMillis (),
443- TimeUnit .MILLISECONDS );
444- }
445- } finally {
446- alarmsLock .unlock ();
447- }
448- }
449-
450442 private class AckDeadlineAlarm implements Runnable {
451443 @ Override
452444 public void run () {
@@ -574,31 +566,26 @@ private void processOutstandingAckOperations(
574566 List <PendingModifyAckDeadline > ackDeadlineExtensions ) {
575567 List <PendingModifyAckDeadline > modifyAckDeadlinesToSend =
576568 Lists .newArrayList (ackDeadlineExtensions );
577- List <String > acksToSend = new ArrayList <>(pendingAcks .size ());
578- synchronized (pendingAcks ) {
579- if (!pendingAcks .isEmpty ()) {
580- try {
581- acksToSend = new ArrayList <>(pendingAcks );
582- logger .log (Level .FINER , "Sending {0} acks" , acksToSend .size ());
583- } finally {
584- pendingAcks .clear ();
585- }
586- }
587- }
569+
570+ List <String > acksToSend = new ArrayList <>();
571+ pendingAcks .drainTo (acksToSend );
572+ logger .log (Level .FINER , "Sending {0} acks" , acksToSend .size ());
573+
588574 PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline (0 );
589- synchronized (pendingNacks ) {
590- if (!pendingNacks .isEmpty ()) {
591- try {
592- for (String ackId : pendingNacks ) {
593- nacksToSend .addAckId (ackId );
594- }
595- logger .log (Level .FINER , "Sending {0} nacks" , pendingNacks .size ());
596- } finally {
597- pendingNacks .clear ();
598- }
599- modifyAckDeadlinesToSend .add (nacksToSend );
600- }
575+ pendingNacks .drainTo (nacksToSend .ackIds );
576+ logger .log (Level .FINER , "Sending {0} nacks" , nacksToSend .ackIds .size ());
577+ if (!nacksToSend .ackIds .isEmpty ()) {
578+ modifyAckDeadlinesToSend .add (nacksToSend );
601579 }
580+
581+ PendingModifyAckDeadline receiptsToSend =
582+ new PendingModifyAckDeadline (getMessageDeadlineSeconds ());
583+ pendingReceipts .drainTo (receiptsToSend .ackIds );
584+ logger .log (Level .FINER , "Sending {0} receipts" , receiptsToSend .ackIds .size ());
585+ if (!receiptsToSend .ackIds .isEmpty ()) {
586+ modifyAckDeadlinesToSend .add (receiptsToSend );
587+ }
588+
602589 ackProcessor .sendAckOperations (acksToSend , modifyAckDeadlinesToSend );
603590 }
604591}
0 commit comments