Skip to content

Conversation

@ankitsultana
Copy link
Contributor

Building upon #13885, this PR adds support for the Combine Operator and other segment level operators. A test in QueryExecutorTest verifies that the combine operator works as expected.

I have also updated the test dataset to make it more meaningful and also add a time column to the same.

The PRs are light on test right now. Since this is rapidly evolving I am focusing on tests that test E2E functionality. I'll be improving the coverage significantly after a baseline implementation is working with a Quickstart.

@ankitsultana ankitsultana added the timeseries-engine Tracking tag for generic time-series engine work label Sep 14, 2024
@codecov-commenter
Copy link

codecov-commenter commented Sep 14, 2024

Codecov Report

Attention: Patch coverage is 44.62151% with 139 lines in your changes missing coverage. Please review.

Project coverage is 57.89%. Comparing base (59551e4) to head (2af428f).
Report is 1043 commits behind head on master.

Files with missing lines Patch % Lines
...ator/timeseries/TimeSeriesAggregationOperator.java 43.92% 56 Missing and 4 partials ⚠️
...ombine/merger/TimeSeriesAggResultsBlockMerger.java 16.00% 21 Missing ⚠️
...inot/common/request/context/TimeSeriesContext.java 0.00% 16 Missing ⚠️
...erator/timeseries/TimeSeriesSelectionOperator.java 0.00% 8 Missing ⚠️
...sdb/spi/series/SimpleTimeSeriesBuilderFactory.java 0.00% 7 Missing ⚠️
...ator/timeseries/TimeSeriesPassThroughOperator.java 0.00% 6 Missing ⚠️
...core/query/executor/ServerQueryExecutorV1Impl.java 14.28% 5 Missing and 1 partial ⚠️
...perator/blocks/results/TimeSeriesResultsBlock.java 44.44% 5 Missing ⚠️
...org/apache/pinot/core/plan/TimeSeriesPlanNode.java 87.87% 1 Missing and 3 partials ⚠️
...b/spi/series/TimeSeriesBuilderFactoryProvider.java 0.00% 3 Missing ⚠️
... and 2 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13999      +/-   ##
============================================
- Coverage     61.75%   57.89%   -3.86%     
+ Complexity      207      199       -8     
============================================
  Files          2436     2621     +185     
  Lines        133233   143470   +10237     
  Branches      20636    22015    +1379     
============================================
+ Hits          82274    83064     +790     
- Misses        44911    53914    +9003     
- Partials       6048     6492     +444     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 57.85% <44.62%> (-3.86%) ⬇️
java-21 57.78% <44.62%> (-3.85%) ⬇️
skip-bytebuffers-false 57.88% <44.62%> (-3.87%) ⬇️
skip-bytebuffers-true 57.75% <44.62%> (+30.02%) ⬆️
temurin 57.89% <44.62%> (-3.86%) ⬇️
unittests 57.89% <44.62%> (-3.86%) ⬇️
unittests1 40.67% <44.62%> (-6.22%) ⬇️
unittests2 27.91% <0.00%> (+0.18%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

private final TimeUnit _timeUnit;
private final TimeBuckets _timeBuckets;
private final Long _offsetSeconds;
private final ExpressionContext _valueExpression;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the expressions allowed for valueExpression? I though this is the name of the value column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any "transform expression" is allowed. Transform expression in Pinot SQL is anything that is allowed in the SQL Transform/Project phase. e.g. SELECT 100*(colname/3) FROM ... is allowed in SQL, so in say M3QL, we can choose to allow:

fetch table:<table-name> value:"100*(colname/3)" | sum city_id ...

@kishoreg : might have some ideas on how you folks can support it in PromQL.

currentTimeSeries = currentTimeSeriesList.get(0);
}
TimeSeries newTimeSeriesToMerge = entry.getValue().get(0);
if (currentTimeSeries == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain currentTimeSeries == null condition? If it is null then why are we adding a new entry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So under the combine operator we run per-segment level operators. The combine operator merges results across segments using the merger. It could be that a given segment yields a series: city_name='Boston', which didn't exist in any other segment.

In such a case, the current time series block will not have this series and this condition will be met. And that's why we don't need to merge anything here and we can simply add the series to the currentTimeSeriesBlock

timeValues = applyTimeshift(_timeOffset, timeValues);
}
int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit);
Object[][] tagValues = new Object[_groupByExpressions.size()][];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are thinking to store Lables in promtheus messages as json blobs. To filter the specific label values groupByExpression will be used. Will this work for such columns?
http_requests_total{status="200", method="GET"}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup this will work. Group by can have arbitrary expressions. Though I need to rename ScanFilterAndProjectPlanNode to something like PinotLeafPlanNode, and change groupByColumns variable in it to groupByExpressions.

Added a task here: #13957

In the quickstart I will add an example query leveraging this.

}
int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit);
Object[][] tagValues = new Object[_groupByExpressions.size()][];
Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why hard coded 1k length ? can it overflow ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is the initial capacity. I had set it up thinking that this is a reasonable number, but I don't have a benchmark to verify if this is efficient in most scenarios. I can revert it to use the default ctor if required. Lmk.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine but may be we can add a TODO to look out for this in perf runs.

}
}

public void processStringExpression(BlockValSet blockValSet, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we have time series value as string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we could but only in the leaf stage. For instance, in M3 we would like to be able to support queries such as:

fetch table:foobar value:some_uuid_column
  | summarizeBy 1h countDistinct
  ...

This will essentially run a countDistinct in a 1 hour bucket on the some_uuid_column. This is a common use-case we have internally.

This will require an extension to the current TimeSeries code though. Right now we have tied TimeSeries to Double[] values. We will make it generic in a subsequent PR.

} else {
instanceResponse = new InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext));
if (QueryContextUtils.isTimeSeriesQuery(queryContext)) {
// TODO: handle invalid segments
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen now if we get invalid segments? In what cases invalid segments can reach here?

_segmentsToQuery = segmentsToQuery;
_optionalSegments = null;

_timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to use existing pinot server metrics to troubleshoot any latency or other issues in time series engine, correct ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will require more metrics for sure. We will pick it up next month since we need to productionize this in October.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this to Misc task list for tracking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup added here: #13957

@raghavyadav01
Copy link
Collaborator

@ankitsultana ankitsultana merged commit 9c44ef7 into apache:master Sep 19, 2024
@yashmayya yashmayya mentioned this pull request Sep 23, 2024
17 tasks
Comment on lines +447 to +452
if (QueryContextUtils.isTimeSeriesQuery(queryContext)) {
// TODO: handle invalid segments
TimeSeriesBlock seriesBlock = new TimeSeriesBlock(
queryContext.getTimeSeriesContext().getTimeBuckets(), Collections.emptyMap());
TimeSeriesResultsBlock resultsBlock = new TimeSeriesResultsBlock(seriesBlock);
instanceResponse = new InstanceResponseBlock(resultsBlock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this code be in ResultsBlockUtils.buildEmptyQueryResults?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, it looks like we have no tests verifying this code is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we could move it there. I can raise a follow-up PR if required. Lmk @gortiz

re: tests, that's expected. We are trying to get a Quickstart working right now. Once the Part-4 PR is merged we will start raising smaller PRs with tests, bug fixes and other improvements.

Test coverage should be fixed in October.

gortiz added a commit to gortiz/pinot that referenced this pull request Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

timeseries-engine Tracking tag for generic time-series engine work

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants