-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Part-2: Add Combine and Segment Level Operators for Time Series #13999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ed955f4 to
64a7529
Compare
64a7529 to
2af428f
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13999 +/- ##
============================================
- Coverage 61.75% 57.89% -3.86%
+ Complexity 207 199 -8
============================================
Files 2436 2621 +185
Lines 133233 143470 +10237
Branches 20636 22015 +1379
============================================
+ Hits 82274 83064 +790
- Misses 44911 53914 +9003
- Partials 6048 6492 +444
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
| private final TimeUnit _timeUnit; | ||
| private final TimeBuckets _timeBuckets; | ||
| private final Long _offsetSeconds; | ||
| private final ExpressionContext _valueExpression; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the expressions allowed for valueExpression? I though this is the name of the value column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any "transform expression" is allowed. Transform expression in Pinot SQL is anything that is allowed in the SQL Transform/Project phase. e.g. SELECT 100*(colname/3) FROM ... is allowed in SQL, so in say M3QL, we can choose to allow:
fetch table:<table-name> value:"100*(colname/3)" | sum city_id ...
@kishoreg : might have some ideas on how you folks can support it in PromQL.
| currentTimeSeries = currentTimeSeriesList.get(0); | ||
| } | ||
| TimeSeries newTimeSeriesToMerge = entry.getValue().get(0); | ||
| if (currentTimeSeries == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain currentTimeSeries == null condition? If it is null then why are we adding a new entry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So under the combine operator we run per-segment level operators. The combine operator merges results across segments using the merger. It could be that a given segment yields a series: city_name='Boston', which didn't exist in any other segment.
In such a case, the current time series block will not have this series and this condition will be met. And that's why we don't need to merge anything here and we can simply add the series to the currentTimeSeriesBlock
| timeValues = applyTimeshift(_timeOffset, timeValues); | ||
| } | ||
| int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit); | ||
| Object[][] tagValues = new Object[_groupByExpressions.size()][]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are thinking to store Lables in promtheus messages as json blobs. To filter the specific label values groupByExpression will be used. Will this work for such columns?
http_requests_total{status="200", method="GET"}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup this will work. Group by can have arbitrary expressions. Though I need to rename ScanFilterAndProjectPlanNode to something like PinotLeafPlanNode, and change groupByColumns variable in it to groupByExpressions.
Added a task here: #13957
In the quickstart I will add an example query leveraging this.
| } | ||
| int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit); | ||
| Object[][] tagValues = new Object[_groupByExpressions.size()][]; | ||
| Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why hard coded 1k length ? can it overflow ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is the initial capacity. I had set it up thinking that this is a reasonable number, but I don't have a benchmark to verify if this is efficient in most scenarios. I can revert it to use the default ctor if required. Lmk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is fine but may be we can add a TODO to look out for this in perf runs.
| } | ||
| } | ||
|
|
||
| public void processStringExpression(BlockValSet blockValSet, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we have time series value as string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we could but only in the leaf stage. For instance, in M3 we would like to be able to support queries such as:
fetch table:foobar value:some_uuid_column
| summarizeBy 1h countDistinct
...
This will essentially run a countDistinct in a 1 hour bucket on the some_uuid_column. This is a common use-case we have internally.
This will require an extension to the current TimeSeries code though. Right now we have tied TimeSeries to Double[] values. We will make it generic in a subsequent PR.
| } else { | ||
| instanceResponse = new InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext)); | ||
| if (QueryContextUtils.isTimeSeriesQuery(queryContext)) { | ||
| // TODO: handle invalid segments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen now if we get invalid segments? In what cases invalid segments can reach here?
| _segmentsToQuery = segmentsToQuery; | ||
| _optionalSegments = null; | ||
|
|
||
| _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to use existing pinot server metrics to troubleshoot any latency or other issues in time series engine, correct ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will require more metrics for sure. We will pick it up next month since we need to productionize this in October.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add this to Misc task list for tracking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup added here: #13957
|
@ankitsultana There are couple of tests failing https://github.com/apache/pinot/actions/runs/10860622852/job/30141445761?pr=13999 Are these related or unrelated? |
| if (QueryContextUtils.isTimeSeriesQuery(queryContext)) { | ||
| // TODO: handle invalid segments | ||
| TimeSeriesBlock seriesBlock = new TimeSeriesBlock( | ||
| queryContext.getTimeSeriesContext().getTimeBuckets(), Collections.emptyMap()); | ||
| TimeSeriesResultsBlock resultsBlock = new TimeSeriesResultsBlock(seriesBlock); | ||
| instanceResponse = new InstanceResponseBlock(resultsBlock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this code be in ResultsBlockUtils.buildEmptyQueryResults?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, it looks like we have no tests verifying this code is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we could move it there. I can raise a follow-up PR if required. Lmk @gortiz
re: tests, that's expected. We are trying to get a Quickstart working right now. Once the Part-4 PR is merged we will start raising smaller PRs with tests, bug fixes and other improvements.
Test coverage should be fixed in October.
Building upon #13885, this PR adds support for the Combine Operator and other segment level operators. A test in
QueryExecutorTestverifies that the combine operator works as expected.I have also updated the test dataset to make it more meaningful and also add a time column to the same.
The PRs are light on test right now. Since this is rapidly evolving I am focusing on tests that test E2E functionality. I'll be improving the coverage significantly after a baseline implementation is working with a Quickstart.