Skip to content

Commit 5e3e804

Browse files
committed
make maxWaitTimeMs a function param
1 parent 561338a commit 5e3e804

File tree

3 files changed

+16
-17
lines changed

3 files changed

+16
-17
lines changed

pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,7 +1392,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
13921392
_resourceTmpDir.mkdirs();
13931393
}
13941394
_state = State.INITIAL_CONSUMING;
1395-
_latestStreamOffsetAtStartupTime = fetchLatestStreamOffset();
1395+
_latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
13961396
_consumeStartTime = now();
13971397
setConsumeEndTime(segmentZKMetadata, _consumeStartTime);
13981398
_segmentCommitterFactory =
@@ -1429,8 +1429,7 @@ private void setConsumeEndTime(SegmentZKMetadata segmentZKMetadata, long now) {
14291429
}
14301430
}
14311431

1432-
public StreamPartitionMsgOffset fetchLatestStreamOffset() {
1433-
long maxWaitTimeMs = 5000;
1432+
public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
14341433
try (StreamMetadataProvider metadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId,
14351434
_partitionGroupId)) {
14361435
return metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, maxWaitTimeMs);

pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ protected boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataMan
7474
// message is too old to pass the freshness check. We check this condition separately to avoid hitting
7575
// the stream consumer to check partition count if we're already caught up.
7676
StreamPartitionMsgOffset currentOffset = rtSegmentDataManager.getCurrentOffset();
77-
StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.fetchLatestStreamOffset();
77+
StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.fetchLatestStreamOffset(5000);
7878
if (isOffsetCaughtUp(currentOffset, latestStreamOffset)) {
7979
_logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}."
8080
+ "But the current ingested offset is equal to the latest available offset {}.", segmentName, freshnessMs,

pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ public void regularCaseWithOffsetCatchup() {
9696
when(segMngrB0.getSegment()).thenReturn(mockSegment);
9797
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1500));
9898

99-
when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20));
100-
when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200));
101-
when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(2000));
99+
when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20));
100+
when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200));
101+
when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(2000));
102102
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3);
103103

104104
// current offset latest stream offset current time last ingestion time
@@ -156,9 +156,9 @@ public void regularCaseWithFreshnessCatchup() {
156156
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
157157
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
158158

159-
when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20));
160-
when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200));
161-
when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(2000));
159+
when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20));
160+
when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200));
161+
when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(2000));
162162
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
163163
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
164164
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -218,9 +218,9 @@ public void segmentBeingCommmitted() {
218218
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
219219
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
220220

221-
when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20));
222-
when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200));
223-
when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(2000));
221+
when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20));
222+
when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200));
223+
when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(2000));
224224
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
225225
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
226226
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -276,9 +276,9 @@ public void testCannotGetOffsetsOrFreshness() {
276276
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
277277
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
278278

279-
when(segMngrA0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(20));
280-
when(segMngrA1.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(200));
281-
when(segMngrB0.fetchLatestStreamOffset()).thenReturn(null);
279+
when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20));
280+
when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(200));
281+
when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(null);
282282
when(segMngrA0.getCurrentOffset()).thenReturn(null);
283283
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
284284
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -299,7 +299,7 @@ public void testCannotGetOffsetsOrFreshness() {
299299
// segB0 0 0 100 0
300300
setupLatestIngestionTimestamp(segMngrA0, 89L);
301301
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
302-
when(segMngrB0.fetchLatestStreamOffset()).thenReturn(new LongMsgOffset(0));
302+
when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(0));
303303
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0);
304304
}
305305
}

0 commit comments

Comments
 (0)