@@ -96,9 +96,9 @@ public void regularCaseWithOffsetCatchup() {
9696 when (segMngrB0 .getSegment ()).thenReturn (mockSegment );
9797 when (segMngrB0 .getCurrentOffset ()).thenReturn (new LongMsgOffset (1500 ));
9898
99- when (segMngrA0 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (20 ));
100- when (segMngrA1 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (200 ));
101- when (segMngrB0 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (2000 ));
99+ when (segMngrA0 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (20 ));
100+ when (segMngrA1 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (200 ));
101+ when (segMngrB0 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (2000 ));
102102 assertEquals (statusChecker .getNumConsumingSegmentsNotReachedIngestionCriteria (), 3 );
103103
104104 // current offset latest stream offset current time last ingestion time
@@ -156,9 +156,9 @@ public void regularCaseWithFreshnessCatchup() {
156156 when (tableDataManagerA .acquireSegment (segA1 )).thenReturn (segMngrA1 );
157157 when (tableDataManagerB .acquireSegment (segB0 )).thenReturn (segMngrB0 );
158158
159- when (segMngrA0 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (20 ));
160- when (segMngrA1 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (200 ));
161- when (segMngrB0 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (2000 ));
159+ when (segMngrA0 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (20 ));
160+ when (segMngrA1 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (200 ));
161+ when (segMngrB0 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (2000 ));
162162 when (segMngrA0 .getCurrentOffset ()).thenReturn (new LongMsgOffset (0 ));
163163 when (segMngrA1 .getCurrentOffset ()).thenReturn (new LongMsgOffset (0 ));
164164 when (segMngrB0 .getCurrentOffset ()).thenReturn (new LongMsgOffset (0 ));
@@ -218,9 +218,9 @@ public void segmentBeingCommmitted() {
218218 when (tableDataManagerA .acquireSegment (segA1 )).thenReturn (segMngrA1 );
219219 when (tableDataManagerB .acquireSegment (segB0 )).thenReturn (segMngrB0 );
220220
221- when (segMngrA0 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (20 ));
222- when (segMngrA1 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (200 ));
223- when (segMngrB0 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (2000 ));
221+ when (segMngrA0 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (20 ));
222+ when (segMngrA1 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (200 ));
223+ when (segMngrB0 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (2000 ));
224224 when (segMngrA0 .getCurrentOffset ()).thenReturn (new LongMsgOffset (0 ));
225225 when (segMngrA1 .getCurrentOffset ()).thenReturn (new LongMsgOffset (0 ));
226226 when (segMngrB0 .getCurrentOffset ()).thenReturn (new LongMsgOffset (0 ));
@@ -276,9 +276,9 @@ public void testCannotGetOffsetsOrFreshness() {
276276 when (tableDataManagerA .acquireSegment (segA1 )).thenReturn (segMngrA1 );
277277 when (tableDataManagerB .acquireSegment (segB0 )).thenReturn (segMngrB0 );
278278
279- when (segMngrA0 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (20 ));
280- when (segMngrA1 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (200 ));
281- when (segMngrB0 .fetchLatestStreamOffset ()).thenReturn (null );
279+ when (segMngrA0 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (20 ));
280+ when (segMngrA1 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (200 ));
281+ when (segMngrB0 .fetchLatestStreamOffset (5000 )).thenReturn (null );
282282 when (segMngrA0 .getCurrentOffset ()).thenReturn (null );
283283 when (segMngrA1 .getCurrentOffset ()).thenReturn (new LongMsgOffset (0 ));
284284 when (segMngrB0 .getCurrentOffset ()).thenReturn (new LongMsgOffset (0 ));
@@ -299,7 +299,7 @@ public void testCannotGetOffsetsOrFreshness() {
299299 // segB0 0 0 100 0
300300 setupLatestIngestionTimestamp (segMngrA0 , 89L );
301301 when (segMngrA0 .getCurrentOffset ()).thenReturn (new LongMsgOffset (20 ));
302- when (segMngrB0 .fetchLatestStreamOffset ()).thenReturn (new LongMsgOffset (0 ));
302+ when (segMngrB0 .fetchLatestStreamOffset (5000 )).thenReturn (new LongMsgOffset (0 ));
303303 assertEquals (statusChecker .getNumConsumingSegmentsNotReachedIngestionCriteria (), 0 );
304304 }
305305}
0 commit comments