-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Part-3: Working E2E Quickstart for Time Series Engine #14048
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14048 +/- ##
============================================
+ Coverage 61.75% 64.80% +3.05%
- Complexity 207 1534 +1327
============================================
Files 2436 2579 +143
Lines 133233 141262 +8029
Branches 20636 21640 +1004
============================================
+ Hits 82274 91542 +9268
+ Misses 44911 42975 -1936
- Partials 6048 6745 +697
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
| asyncResponse.resume(response); | ||
| } | ||
| } catch (Exception e) { | ||
| LOGGER.error("Caught exception while processing POST request", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do we do translation for http errors codes like Invalid param like Validation error in query planner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the execution part we catch exception, and create a "PinotBrokerTimeSeriesResponse" with the error and errorType set. Right now we don't have good error categories but it's a good point, we should converge on a standard. Added an item to the tracker #13957
| @Path("timeseries/api/v1/query_range") | ||
| @ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series Engine") | ||
| @ManualAuthorization | ||
| public void processTimeSeriesQueryEngine(@Suspended AsyncResponse asyncResponse, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed these API's are not part of Swagger endpoint. Do we need to make additional changes to include in swagger console in pinot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you checking the pinot-controller swagger? I can take it as a follow-up.. I almost never use the pinot-broker swagger.
| if (StringUtils.isNotBlank(timeoutStr)) { | ||
| timeout = HumanReadableDuration.from(timeoutStr); | ||
| } | ||
| // TODO: Pass full raw query param string to the request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is handled now correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good point. Will raise a PR shortly to remove this and make some other minor improvements.
| } | ||
| try { | ||
| return Long.parseLong(step); | ||
| } catch (NumberFormatException ignored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we ignoring the exception? Will we revert to default in case of invalid step time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh because the duration passed by the client could be something like step=10s, step=10. In the former case it's very easy to know what the expectation. In the latter case, we right now assume that the default unit the user is targeting is seconds.
| return _result; | ||
| } | ||
|
|
||
| public static Data newMatrix(List<Value> result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to add other types "vector", "scalar" , "string" as well for prometheus.
| } | ||
|
|
||
| /** | ||
| * Receives a serialized plan sent by the broker, and runs it to completion, blocking the thread until the execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a timeout on the request in this thread or can it hang for ever?
|
|
||
| public class TimeSeriesExecutionContext { | ||
| private final String _language; | ||
| private final TimeBuckets _initialTimeBuckets; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why initial can these TimeBuckets change when execution is happening ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. The time buckets generated by the broker are used by the operators starting at the leaf stage.
From there on, the operators themselves have control over the time buckets and they can also change the granularity of the time-buckets on the fly. e.g. M3 has the "summarize 1h sum" function which allows the same.
| TimeSeriesQueryServerInstance queryServerInstance) { | ||
| String hostname = queryServerInstance.getHostname(); | ||
| int port = queryServerInstance.getQueryServicePort(); | ||
| String key = String.format("%s_%d", hostname, port); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this key generation similar to sql query dispatcher? I am wondering if hostname changes for pod specially in kubernetes cluster what would be the side effect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same as the Multistage Engine so all related constraints will apply. In case of a hostname change, that will involve a node restart which will anyways lead to a query failure. Right now our Multistage queries aren't able to still run if any of the involved server dies or gets restarted midway.
| "Expected exactly one table name in the logical plan, got: %s", | ||
| tableNames); | ||
| String tableName = tableNames.iterator().next(); | ||
| // Step-2: Compute routing table assuming all segments are selected. This is to perform the check to reject tables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid this we will need to make sure in write path all segments for a table go to single server, till we implement multi server in phase2. Can we capture the limitations of phase1 in task list?
| private final Map<Long, List<TimeSeries>> _seriesMap; | ||
|
|
||
| public TimeSeriesBlock(TimeBuckets timeBuckets, Map<Long, List<TimeSeries>> seriesMap) { | ||
| public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map<Long, List<TimeSeries>> seriesMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why timebucket is nullable? What will happen if TimeBucket is null? Is this for Instant query use case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is for that, and also for the case when we have to perform partial aggregates. in that case, we might not be able to bucket the values at a time granularity under the combine operator, and may have to instead return time values as a Long[] instead of TimeBuckets in the TimeSeries.
|
test https://github.com/apache/pinot/actions/runs/10973640885/job/30479986608?pr=14048 is failing though it does not look to be related. |
|
Thanks folks for the quick review. Will be raising smaller PRs going forward for incremental improvements and bug fixes |
| @Override | ||
| public TimeSeriesBlock getNextBlock() { | ||
| TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock(); | ||
| seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure we want to use parallel streams here? What is the advantage? Assuming we have a high enough QPS I can only see queries competing to each other to use these limited threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This entire package will be overwritten in the next few days, and you are right we shouldn't use it.
I have to go through a small review process internally before I can share the actual M3 Plugin implementation which doesn't have any of these hacks.
| // multi-stage request handler uses both Netty and GRPC ports. | ||
| // worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport. | ||
| // TODO: decouple protocol and engine selection. | ||
| queryDispatcher = createQueryDispatcher(_brokerConf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey @ankitsultana i noticed this creates a new query dispatcher even though the MultistageBrokerRequestHandler creates it's own: https://github.com/apache/pinot/blob/master/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java#L174 was there a reason for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the reason was that QueryDispatcher has an in-memory state so we wanted them to be decoupled.
Jackie just merged a PR that creates a separate class for the Time Series dispatcher: #17474
Last PR to get us to a working Quickstart state. The PRs after this will be much smaller. Explaining the changes at a high-level below:
Adds a Prometheus Like Pinot Broker HTTP API
Adds two new APIs to the broker: 1 for range queries and 1 for instant-vector queries. The API is Prometheus Compatible, except that the top-level path is prefixed with
timeseries/api.Changes to Receive a Time Series Query Request in Broker and Send to Server
After the broker receives the query, we use the new
TimeSeriesRequestHandler, which uses thepinot-timeseries-plannermodule'sTimeSeriesQueryEnvironmentto plan the query. The plan is finally dispatched via theQueryDispatcher. The implementation mimics the MSE very closely, but of course the exact details are quite different.Code Duplication: I had to create the dispatch related classes again:
AsyndTimeSeriesDispatchClientand the like; ideally we should just use their MSE equivalents but that will require us to consolidate more stuff and I plan to take it up as part of phase-2.Executing Received Plan in Server
Once the plan is received in the server, we:
QueryRunner'sExecutorService. The same executor service is used byOpChainSchedulerServiceas well.Converting ScanFilterAndProject to TimeSeriesPhysicalTableScan
This is done as part of the plan compilation in the server, because we need to generate the LeafTimeSeriesOperator, which is a pinot-query-runtime construct, and ScanFilterAndProject only has a dependency to pinot-spi.
Dummy M3QL Implementation
For now I have added a dummy M3QL implementation so we can play with the Quickstart.
Instructions for Starting Quickstart
You can start the "TIME_SERIES" quick start, wait a minute or so for some data to be populated, and then run: