-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Support size based threshold for pauseless consumption #15347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support size based threshold for pauseless consumption #15347
Conversation
441f02e to
6d71ea7
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #15347 +/- ##
============================================
+ Coverage 61.75% 63.58% +1.83%
- Complexity 207 1374 +1167
============================================
Files 2436 2785 +349
Lines 133233 157211 +23978
Branches 20636 24137 +3501
============================================
+ Hits 82274 99962 +17688
- Misses 44911 49703 +4792
- Partials 6048 7546 +1498
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
6d71ea7 to
b3fb5e1
Compare
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
Show resolved
Hide resolved
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
Show resolved
Hide resolved
| } | ||
| if (tableConfig.getIndexingConfig() != null && tableConfig.getIndexingConfig().getStreamConfigs() != null) { | ||
| return Arrays.asList(tableConfig.getIndexingConfig().getStreamConfigs()); | ||
| return List.of(tableConfig.getIndexingConfig().getStreamConfigs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR: getStreamConfigMaps() seems to be performing validations on the streaamConfigMaps in case of multi-stream ingestion.
This should not be done in a getter. The validate function in the TableConfigUtils relies on this to perform these checks when the tableConfig is added/ updated. Ideally, there should be a separate function for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. We should address this in a separate PR
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
Show resolved
Hide resolved
| Preconditions.checkState(committingSegmentZKMetadata.getStatus() != Status.DONE, | ||
| "Segment status for segment: %s should not be DONE", segmentName); | ||
| Status status = committingSegmentZKMetadata.getStatus(); | ||
| Preconditions.checkState(status == Status.IN_PROGRESS || status == Status.COMMITTING, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a check for pauseless enabled as well in addition to IN_PROGRESS. For pauseless tables, we should not jump directly to DONE from IN_PROGRESS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have the pauseless info at this level. The check is already performed in commitSegmentMetadataToDone().
| List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = | ||
| offsetsHaveToChange | ||
| ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions | ||
| offsetsHaveToChange ? Collections.emptyList() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted (auto-reformat)
| // offsets from metadata are not valid anymore; fetch for all partitions | ||
| : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); | ||
| // FIXME: Right now, we assume topics are sharing same offset criteria | ||
| OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use getFirstStreamConfigMap method here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need all configs in other method, so it is better to just read all configs once
KKcorps
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM logic wise! There are a lot of unnecessary formatting changes here though.
f91c40b to
17469c0
Compare
Break
FlushThresholdUpdaterinto 2 steps:This way pauseless consumption can update the stats properly.