2828import com .google .pubsub .v1 .PubsubMessage ;
2929import com .google .pubsub .v1 .ReceivedMessage ;
3030import java .util .ArrayList ;
31- import java .util .Collection ;
32- import java .util .HashMap ;
31+ import java .util .Collections ;
3332import java .util .HashSet ;
3433import java .util .Iterator ;
3534import java .util .List ;
36- import java .util .Map ;
35+ import java .util .PriorityQueue ;
3736import java .util .Set ;
3837import java .util .concurrent .ScheduledExecutorService ;
3938import java .util .concurrent .ScheduledFuture ;
@@ -68,8 +67,7 @@ class MessageDispatcher {
6867 private final FlowController flowController ;
6968 private final MessagesWaiter messagesWaiter ;
7069
71- // Map of outstanding messages (value) ordered by expiration time (key) in ascending order.
72- private final Map <ExpirationInfo , List <AckHandler >> outstandingAckHandlers ;
70+ private final PriorityQueue <ExtensionJob > outstandingAckHandlers ;
7371 private final Set <String > pendingAcks ;
7472 private final Set <String > pendingNacks ;
7573
@@ -82,40 +80,43 @@ class MessageDispatcher {
8280 // To keep track of number of seconds the receiver takes to process messages.
8381 private final Distribution ackLatencyDistribution ;
8482
85- private static class ExpirationInfo implements Comparable <ExpirationInfo > {
86- private final Clock clock ;
83+ // ExtensionJob represents a group of {@code AckHandler}s that shares the same expiration.
84+ //
85+ // It is Comparable so that it may be put in a PriorityQueue.
86+ // For efficiency, it is also mutable, so great care should be taken to make sure
87+ // it is not modified while inside the queue.
88+ // The hashcode and equals methods are explicitly not implemented to discourage
89+ // the use of this class as keys in maps or similar containers.
90+ private static class ExtensionJob implements Comparable <ExtensionJob > {
8791 Instant expiration ;
8892 int nextExtensionSeconds ;
93+ ArrayList <AckHandler > ackHandlers ;
8994
90- ExpirationInfo ( Clock clock , Instant expiration , int initialAckDeadlineExtension ) {
91- this . clock = clock ;
95+ ExtensionJob (
96+ Instant expiration , int initialAckDeadlineExtension , ArrayList < AckHandler > ackHandlers ) {
9297 this .expiration = expiration ;
9398 nextExtensionSeconds = initialAckDeadlineExtension ;
99+ this .ackHandlers = ackHandlers ;
94100 }
95101
96- void extendExpiration () {
97- expiration = new Instant ( clock . millis ()) .plus (Duration .standardSeconds (nextExtensionSeconds ));
102+ void extendExpiration (Instant now ) {
103+ expiration = now .plus (Duration .standardSeconds (nextExtensionSeconds ));
98104 nextExtensionSeconds = Math .min (2 * nextExtensionSeconds , MAX_ACK_DEADLINE_EXTENSION_SECS );
99105 }
100106
101107 @ Override
102- public int hashCode ( ) {
103- return expiration .hashCode ( );
108+ public int compareTo ( ExtensionJob other ) {
109+ return expiration .compareTo ( other . expiration );
104110 }
105111
106- @ Override
107- public boolean equals ( Object obj ) {
108- if (!( obj instanceof ExpirationInfo ) ) {
109- return false ;
112+ public String toString () {
113+ ArrayList < String > ackIds = new ArrayList <>();
114+ for ( AckHandler ah : ackHandlers ) {
115+ ackIds . add ( ah . ackId ) ;
110116 }
111-
112- ExpirationInfo other = (ExpirationInfo ) obj ;
113- return expiration .equals (other .expiration );
114- }
115-
116- @ Override
117- public int compareTo (ExpirationInfo other ) {
118- return expiration .compareTo (other .expiration );
117+ return String .format (
118+ "ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}" ,
119+ expiration , nextExtensionSeconds , ackIds );
119120 }
120121 }
121122
@@ -137,6 +138,12 @@ static class PendingModifyAckDeadline {
137138 public void addAckId (String ackId ) {
138139 ackIds .add (ackId );
139140 }
141+
142+ public String toString () {
143+ return String .format (
144+ "PendingModifyAckDeadline{extension: %d sec, ackIds: %s}" ,
145+ deadlineExtensionSeconds , ackIds );
146+ }
140147 }
141148
142149 /**
@@ -217,7 +224,7 @@ void sendAckOperations(
217224 this .receiver = receiver ;
218225 this .ackProcessor = ackProcessor ;
219226 this .flowController = flowController ;
220- outstandingAckHandlers = new HashMap <>();
227+ outstandingAckHandlers = new PriorityQueue <>();
221228 pendingAcks = new HashSet <>();
222229 pendingNacks = new HashSet <>();
223230 // 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
@@ -257,20 +264,14 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
257264 }
258265 Instant now = new Instant (clock .millis ());
259266 int totalByteCount = 0 ;
260- final List <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
267+ final ArrayList <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
261268 for (ReceivedMessage pubsubMessage : responseMessages ) {
262269 int messageSize = pubsubMessage .getMessage ().getSerializedSize ();
263270 totalByteCount += messageSize ;
264271 ackHandlers .add (new AckHandler (pubsubMessage .getAckId (), messageSize ));
265272 }
266- ExpirationInfo expiration =
267- new ExpirationInfo (
268- clock , now .plus (messageDeadlineSeconds * 1000 ), INITIAL_ACK_DEADLINE_EXTENSION_SECONDS );
269- synchronized (outstandingAckHandlers ) {
270- addOutstadingAckHandlers (expiration , ackHandlers );
271- }
273+ Instant expiration = now .plus (messageDeadlineSeconds * 1000 );
272274 logger .debug ("Received {} messages at {}" , responseMessages .size (), now );
273- setupNextAckDeadlineExtensionAlarm (expiration );
274275
275276 messagesWaiter .incrementPendingMessages (responseMessages .size ());
276277 Iterator <AckHandler > acksIterator = ackHandlers .iterator ();
@@ -285,21 +286,20 @@ public void run() {
285286 }
286287 });
287288 }
289+
290+ synchronized (outstandingAckHandlers ) {
291+ outstandingAckHandlers .add (
292+ new ExtensionJob (expiration , INITIAL_ACK_DEADLINE_EXTENSION_SECONDS , ackHandlers ));
293+ }
294+ setupNextAckDeadlineExtensionAlarm (expiration );
295+
288296 try {
289297 flowController .reserve (receivedMessagesCount , totalByteCount );
290298 } catch (FlowController .FlowControlException unexpectedException ) {
291299 throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
292300 }
293301 }
294302
295- private void addOutstadingAckHandlers (
296- ExpirationInfo expiration , final List <AckHandler > ackHandlers ) {
297- if (!outstandingAckHandlers .containsKey (expiration )) {
298- outstandingAckHandlers .put (expiration , new ArrayList <AckHandler >(ackHandlers .size ()));
299- }
300- outstandingAckHandlers .get (expiration ).addAll (ackHandlers );
301- }
302-
303303 private void setupPendingAcksAlarm () {
304304 alarmsLock .lock ();
305305 try {
@@ -354,41 +354,49 @@ public void run() {
354354 now ,
355355 cutOverTime ,
356356 ackExpirationPadding );
357- ExpirationInfo nextScheduleExpiration = null ;
357+ Instant nextScheduleExpiration = null ;
358358 List <PendingModifyAckDeadline > modifyAckDeadlinesToSend = new ArrayList <>();
359359
360+ // Holding area for jobs we'll put back into the queue
361+ // so we don't process the same job twice.
362+ List <ExtensionJob > renewJobs = new ArrayList <>();
363+
360364 synchronized (outstandingAckHandlers ) {
361- for (ExpirationInfo messageExpiration : outstandingAckHandlers .keySet ()) {
362- if (messageExpiration .expiration .compareTo (cutOverTime ) <= 0 ) {
363- Collection <AckHandler > expiringAcks = outstandingAckHandlers .get (messageExpiration );
364- outstandingAckHandlers .remove (messageExpiration );
365- List <AckHandler > renewedAckHandlers = new ArrayList <>(expiringAcks .size ());
366- messageExpiration .extendExpiration ();
367- int extensionSeconds =
368- Ints .saturatedCast (
369- new Interval (now , messageExpiration .expiration )
370- .toDuration ()
371- .getStandardSeconds ());
372- PendingModifyAckDeadline pendingModAckDeadline =
373- new PendingModifyAckDeadline (extensionSeconds );
374- for (AckHandler ackHandler : expiringAcks ) {
375- if (ackHandler .acked .get ()) {
376- continue ;
377- }
378- pendingModAckDeadline .addAckId (ackHandler .ackId );
379- renewedAckHandlers .add (ackHandler );
380- }
381- modifyAckDeadlinesToSend .add (pendingModAckDeadline );
382- if (!renewedAckHandlers .isEmpty ()) {
383- addOutstadingAckHandlers (messageExpiration , renewedAckHandlers );
365+ while (!outstandingAckHandlers .isEmpty ()
366+ && outstandingAckHandlers .peek ().expiration .compareTo (cutOverTime ) <= 0 ) {
367+ ExtensionJob job = outstandingAckHandlers .poll ();
368+
369+ // If a message has already been acked, remove it, nothing to do.
370+ for (int i = 0 ; i < job .ackHandlers .size (); ) {
371+ if (job .ackHandlers .get (i ).acked .get ()) {
372+ Collections .swap (job .ackHandlers , i , job .ackHandlers .size () - 1 );
373+ job .ackHandlers .remove (job .ackHandlers .size () - 1 );
384374 } else {
385- outstandingAckHandlers . remove ( messageExpiration ) ;
375+ i ++ ;
386376 }
387377 }
388- if ( nextScheduleExpiration == null
389- || nextScheduleExpiration . expiration . isAfter ( messageExpiration . expiration )) {
390- nextScheduleExpiration = messageExpiration ;
378+
379+ if ( job . ackHandlers . isEmpty ( )) {
380+ continue ;
391381 }
382+
383+ job .extendExpiration (now );
384+ int extensionSeconds =
385+ Ints .saturatedCast (
386+ new Interval (now , job .expiration ).toDuration ().getStandardSeconds ());
387+ PendingModifyAckDeadline pendingModAckDeadline =
388+ new PendingModifyAckDeadline (extensionSeconds );
389+ for (AckHandler ackHandler : job .ackHandlers ) {
390+ pendingModAckDeadline .addAckId (ackHandler .ackId );
391+ }
392+ modifyAckDeadlinesToSend .add (pendingModAckDeadline );
393+ renewJobs .add (job );
394+ }
395+ for (ExtensionJob job : renewJobs ) {
396+ outstandingAckHandlers .add (job );
397+ }
398+ if (!outstandingAckHandlers .isEmpty ()) {
399+ nextScheduleExpiration = outstandingAckHandlers .peek ().expiration ;
392400 }
393401 }
394402
@@ -404,8 +412,8 @@ public void run() {
404412 }
405413 }
406414
407- private void setupNextAckDeadlineExtensionAlarm (ExpirationInfo messageExpiration ) {
408- Instant possibleNextAlarmTime = messageExpiration . expiration .minus (ackExpirationPadding );
415+ private void setupNextAckDeadlineExtensionAlarm (Instant expiration ) {
416+ Instant possibleNextAlarmTime = expiration .minus (ackExpirationPadding );
409417 alarmsLock .lock ();
410418 try {
411419 if (nextAckDeadlineExtensionAlarmTime .isAfter (possibleNextAlarmTime )) {
0 commit comments