@@ -284,9 +284,12 @@ public void testModifyAckDeadline() throws Exception {
284284 // Send messages to be acked
285285 List <String > testAckIdsBatch = ImmutableList .of ("A" , "B" , "C" );
286286 testReceiver .setExplicitAck (true );
287+ // A modify ack deadline should be schedule for the next 9s
288+ fakeExecutor .setupScheduleExpectation (Duration .standardSeconds (9 ));
287289 sendMessages (testAckIdsBatch );
290+ // To ensure first modify ack deadline got scheduled
291+ fakeExecutor .waitForExpectedWork ();
288292
289- // Trigger modify ack deadline sending - 10s initial stream ack deadline - 1 padding
290293 fakeExecutor .advanceTime (Duration .standardSeconds (9 ));
291294
292295 assertEquivalentWithTransformation (
@@ -299,17 +302,16 @@ public ModifyAckDeadline apply(String ack) {
299302 }
300303 });
301304
302- // Trigger modify ack deadline sending - 2s of the renewed deadlines
303- fakeExecutor .advanceTime (Duration .standardSeconds (2 ));
305+ fakeExecutor .advanceTime (Duration .standardSeconds (1 ));
304306
305307 assertEquivalentWithTransformation (
306308 testAckIdsBatch ,
307309 fakeSubscriberServiceImpl .waitAndConsumeModifyAckDeadlines (3 ),
308310 new Function <String , ModifyAckDeadline >() {
309311 @ Override
310312 public ModifyAckDeadline apply (String ack ) {
311- return new ModifyAckDeadline (ack , 2 ); // It is expected that the deadline is renewed
312- // only two more seconds to not pass the max
313+ return new ModifyAckDeadline (ack , 3 ); // It is expected that the deadline is renewed
314+ // only three more seconds to not pass the max
313315 // ack deadline ext.
314316 }
315317 });
@@ -332,9 +334,13 @@ public void testModifyAckDeadline_defaultMaxExtensionPeriod() throws Exception {
332334 // Send messages to be acked
333335 List <String > testAckIdsBatch = ImmutableList .of ("A" , "B" , "C" );
334336 testReceiver .setExplicitAck (true );
337+ // A modify ack deadline should be schedule for the next 9s
338+ fakeExecutor .setupScheduleExpectation (Duration .standardSeconds (9 ));
335339 sendMessages (testAckIdsBatch );
340+ // To ensure the first modify ack deadlines got scheduled
341+ fakeExecutor .waitForExpectedWork ();
336342
337- // Trigger modify ack deadline sending - 10s initial stream ack deadline - 1 padding
343+ // Next modify ack deadline should be schedule in the next 1s
338344 fakeExecutor .advanceTime (Duration .standardSeconds (9 ));
339345
340346 assertEquivalentWithTransformation (
@@ -347,12 +353,12 @@ public ModifyAckDeadline apply(String ack) {
347353 }
348354 });
349355
350- int timeIncrementSecs = INITIAL_ACK_DEADLINE_EXTENSION_SECS * 2 ; // Second time increment
356+ fakeExecutor .advanceTime (Duration .standardSeconds (1 ));
357+ int timeIncrementSecs = INITIAL_ACK_DEADLINE_EXTENSION_SECS ; // Second time increment
351358
352359 // Check ack deadline extensions while the current time has not reached 60 minutes
353- while (fakeExecutor .getClock ().millisTime () + (timeIncrementSecs * 1000 ) < 1000 * 60 * 60 ) {
354- fakeExecutor .advanceTime (Duration .standardSeconds (timeIncrementSecs ));
355-
360+ while (fakeExecutor .getClock ().millisTime () + timeIncrementSecs - 1 < 1000 * 60 * 60 ) {
361+ timeIncrementSecs *= 2 ;
356362 final int expectedIncrementSecs = Math .min (600 , timeIncrementSecs );
357363 assertEquivalentWithTransformation (
358364 testAckIdsBatch ,
@@ -363,7 +369,7 @@ public ModifyAckDeadline apply(String ack) {
363369 return new ModifyAckDeadline (ack , expectedIncrementSecs );
364370 }
365371 });
366- timeIncrementSecs *= 2 ;
372+ fakeExecutor . advanceTime ( Duration . standardSeconds ( timeIncrementSecs - 1 ));
367373 }
368374
369375 // No more modify ack deadline extension should be triggered at this point
0 commit comments