@@ -61,6 +61,7 @@ class MessageDispatcher {
6161 private final ApiClock clock ;
6262
6363 private final Duration ackExpirationPadding ;
64+ private final Duration maxAckExtensionPeriod ;
6465 private final MessageReceiver receiver ;
6566 private final AckProcessor ackProcessor ;
6667
@@ -87,20 +88,27 @@ class MessageDispatcher {
8788 // it is not modified while inside the queue.
8889 // The hashcode and equals methods are explicitly not implemented to discourage
8990 // the use of this class as keys in maps or similar containers.
90- private static class ExtensionJob implements Comparable <ExtensionJob > {
91+ private class ExtensionJob implements Comparable <ExtensionJob > {
92+ Instant creation ;
9193 Instant expiration ;
9294 int nextExtensionSeconds ;
9395 ArrayList <AckHandler > ackHandlers ;
9496
9597 ExtensionJob (
96- Instant expiration , int initialAckDeadlineExtension , ArrayList <AckHandler > ackHandlers ) {
98+ Instant creation ,
99+ Instant expiration ,
100+ int initialAckDeadlineExtension ,
101+ ArrayList <AckHandler > ackHandlers ) {
102+ this .creation = creation ;
97103 this .expiration = expiration ;
98104 nextExtensionSeconds = initialAckDeadlineExtension ;
99105 this .ackHandlers = ackHandlers ;
100106 }
101107
102108 void extendExpiration (Instant now ) {
103- expiration = now .plus (Duration .standardSeconds (nextExtensionSeconds ));
109+ Instant possibleExtension = now .plus (Duration .standardSeconds (nextExtensionSeconds ));
110+ Instant maxExtension = creation .plus (maxAckExtensionPeriod );
111+ expiration = possibleExtension .isBefore (maxExtension ) ? possibleExtension : maxExtension ;
104112 nextExtensionSeconds = Math .min (2 * nextExtensionSeconds , MAX_ACK_DEADLINE_EXTENSION_SECS );
105113 }
106114
@@ -217,12 +225,14 @@ void sendAckOperations(
217225 MessageReceiver receiver ,
218226 AckProcessor ackProcessor ,
219227 Duration ackExpirationPadding ,
228+ Duration maxAckExtensionPeriod ,
220229 Distribution ackLatencyDistribution ,
221230 FlowController flowController ,
222231 ScheduledExecutorService executor ,
223232 ApiClock clock ) {
224233 this .executor = executor ;
225234 this .ackExpirationPadding = ackExpirationPadding ;
235+ this .maxAckExtensionPeriod = maxAckExtensionPeriod ;
226236 this .receiver = receiver ;
227237 this .ackProcessor = ackProcessor ;
228238 this .flowController = flowController ;
@@ -305,7 +315,11 @@ public void run() {
305315
306316 synchronized (outstandingAckHandlers ) {
307317 outstandingAckHandlers .add (
308- new ExtensionJob (expiration , INITIAL_ACK_DEADLINE_EXTENSION_SECONDS , ackHandlers ));
318+ new ExtensionJob (
319+ new Instant (clock .millisTime ()),
320+ expiration ,
321+ INITIAL_ACK_DEADLINE_EXTENSION_SECONDS ,
322+ ackHandlers ));
309323 }
310324 setupNextAckDeadlineExtensionAlarm (expiration );
311325
@@ -380,6 +394,13 @@ public void run() {
380394 && outstandingAckHandlers .peek ().expiration .compareTo (cutOverTime ) <= 0 ) {
381395 ExtensionJob job = outstandingAckHandlers .poll ();
382396
397+ if (maxAckExtensionPeriod .getMillis () > 0
398+ && job .creation .plus (maxAckExtensionPeriod ).compareTo (now ) <= 0 ) {
399+ // The job has expired, according to the maxAckExtensionPeriod, we are just going to
400+ // drop it.
401+ continue ;
402+ }
403+
383404 // If a message has already been acked, remove it, nothing to do.
384405 for (int i = 0 ; i < job .ackHandlers .size (); ) {
385406 if (job .ackHandlers .get (i ).acked .get ()) {
0 commit comments