Skip to content

Conversation

@ankitsultana
Copy link
Contributor

@ankitsultana ankitsultana commented Sep 20, 2024

Last PR to get us to a working Quickstart state. The PRs after this will be much smaller. Explaining the changes at a high-level below:

Adds a Prometheus Like Pinot Broker HTTP API

Adds two new APIs to the broker: 1 for range queries and 1 for instant-vector queries. The API is Prometheus Compatible, except that the top-level path is prefixed with timeseries/api.

Changes to Receive a Time Series Query Request in Broker and Send to Server

After the broker receives the query, we use the new TimeSeriesRequestHandler, which uses the pinot-timeseries-planner module's TimeSeriesQueryEnvironment to plan the query. The plan is finally dispatched via the QueryDispatcher. The implementation mimics the MSE very closely, but of course the exact details are quite different.

Code Duplication: I had to create the dispatch related classes again: AsyndTimeSeriesDispatchClient and the like; ideally we should just use their MSE equivalents but that will require us to consolidate more stuff and I plan to take it up as part of phase-2.

Executing Received Plan in Server

Once the plan is received in the server, we:

  1. Deserialize the plan and compile the plan-tree to an operator-tree
  2. Run the operator tree in QueryRunner's ExecutorService. The same executor service is used by OpChainSchedulerService as well.
Converting ScanFilterAndProject to TimeSeriesPhysicalTableScan

This is done as part of the plan compilation in the server, because we need to generate the LeafTimeSeriesOperator, which is a pinot-query-runtime construct, and ScanFilterAndProject only has a dependency to pinot-spi.

Dummy M3QL Implementation

For now I have added a dummy M3QL implementation so we can play with the Quickstart.

Instructions for Starting Quickstart

You can start the "TIME_SERIES" quick start, wait a minute or so for some data to be populated, and then run:

➜  ~ cat script.py
import requests
import time
import urllib


with open('query.txt', 'r') as of:
    query = of.read()
    request = {
            'language': 'm3ql',
            'start': int(time.time()),
            'end': int(time.time()) + 3600,
            'query': query
    }
    query_params = urllib.urlencode(request)
    resp = requests.get('http://localhost:8000/timeseries/api/v1/query_range?' + query_params)
    print(resp.text)

➜  ~ cat query.txt
fetch{table="meetupRsvp_REALTIME",filter="",ts_column="__metadata$recordTimestamp",ts_unit="MILLISECONDS",value="1"}
  | max{group_city}
  | transformNull{0}
  | keepLastValue{}

@codecov-commenter
Copy link

codecov-commenter commented Sep 20, 2024

Codecov Report

Attention: Patch coverage is 2.19124% with 491 lines in your changes missing coverage. Please review.

Project coverage is 64.80%. Comparing base (59551e4) to head (8b15367).
Report is 1101 commits behind head on master.

Files with missing lines Patch % Lines
...pinot/tsdb/planner/TimeSeriesQueryEnvironment.java 0.00% 62 Missing ⚠️
...roker/requesthandler/TimeSeriesRequestHandler.java 0.00% 52 Missing ⚠️
...common/response/PinotBrokerTimeSeriesResponse.java 0.00% 51 Missing ⚠️
.../pinot/query/service/dispatch/QueryDispatcher.java 3.92% 49 Missing ⚠️
...va/org/apache/pinot/query/runtime/QueryRunner.java 0.00% 43 Missing and 1 partial ⚠️
...time/timeseries/PhysicalTimeSeriesPlanVisitor.java 0.00% 40 Missing ⚠️
.../pinot/tsdb/planner/physical/TableScanVisitor.java 0.00% 36 Missing ⚠️
...ache/pinot/common/utils/HumanReadableDuration.java 0.00% 32 Missing ⚠️
...pinot/broker/api/resources/PinotClientRequest.java 0.00% 19 Missing ⚠️
...ery/runtime/timeseries/LeafTimeSeriesOperator.java 0.00% 14 Missing ⚠️
... and 14 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14048      +/-   ##
============================================
+ Coverage     61.75%   64.80%   +3.05%     
- Complexity      207     1534    +1327     
============================================
  Files          2436     2579     +143     
  Lines        133233   141262    +8029     
  Branches      20636    21640    +1004     
============================================
+ Hits          82274    91542    +9268     
+ Misses        44911    42975    -1936     
- Partials       6048     6745     +697     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 64.76% <2.19%> (+3.05%) ⬆️
java-21 64.69% <2.19%> (+3.06%) ⬆️
skip-bytebuffers-false 64.79% <2.19%> (+3.04%) ⬆️
skip-bytebuffers-true 64.65% <2.19%> (+36.93%) ⬆️
temurin 64.80% <2.19%> (+3.05%) ⬆️
unittests 64.79% <2.19%> (+3.05%) ⬆️
unittests1 56.28% <0.72%> (+9.39%) ⬆️
unittests2 34.90% <1.99%> (+7.17%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@ankitsultana ankitsultana changed the title [WIP] Part-4: Working E2E Quickstart for Time Series Engine Part-4: Working E2E Quickstart for Time Series Engine Sep 21, 2024
asyncResponse.resume(response);
}
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST request", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we do translation for http errors codes like Invalid param like Validation error in query planner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the execution part we catch exception, and create a "PinotBrokerTimeSeriesResponse" with the error and errorType set. Right now we don't have good error categories but it's a good point, we should converge on a standard. Added an item to the tracker #13957

@Path("timeseries/api/v1/query_range")
@ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series Engine")
@ManualAuthorization
public void processTimeSeriesQueryEngine(@Suspended AsyncResponse asyncResponse,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed these API's are not part of Swagger endpoint. Do we need to make additional changes to include in swagger console in pinot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you checking the pinot-controller swagger? I can take it as a follow-up.. I almost never use the pinot-broker swagger.

if (StringUtils.isNotBlank(timeoutStr)) {
timeout = HumanReadableDuration.from(timeoutStr);
}
// TODO: Pass full raw query param string to the request
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled now correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point. Will raise a PR shortly to remove this and make some other minor improvements.

}
try {
return Long.parseLong(step);
} catch (NumberFormatException ignored) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we ignoring the exception? Will we revert to default in case of invalid step time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh because the duration passed by the client could be something like step=10s, step=10. In the former case it's very easy to know what the expectation. In the latter case, we right now assume that the default unit the user is targeting is seconds.

return _result;
}

public static Data newMatrix(List<Value> result) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to add other types "vector", "scalar" , "string" as well for prometheus.

}

/**
* Receives a serialized plan sent by the broker, and runs it to completion, blocking the thread until the execution
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a timeout on the request in this thread or can it hang for ever?


public class TimeSeriesExecutionContext {
private final String _language;
private final TimeBuckets _initialTimeBuckets;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why initial can these TimeBuckets change when execution is happening ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. The time buckets generated by the broker are used by the operators starting at the leaf stage.

From there on, the operators themselves have control over the time buckets and they can also change the granularity of the time-buckets on the fly. e.g. M3 has the "summarize 1h sum" function which allows the same.

TimeSeriesQueryServerInstance queryServerInstance) {
String hostname = queryServerInstance.getHostname();
int port = queryServerInstance.getQueryServicePort();
String key = String.format("%s_%d", hostname, port);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this key generation similar to sql query dispatcher? I am wondering if hostname changes for pod specially in kubernetes cluster what would be the side effect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as the Multistage Engine so all related constraints will apply. In case of a hostname change, that will involve a node restart which will anyways lead to a query failure. Right now our Multistage queries aren't able to still run if any of the involved server dies or gets restarted midway.

"Expected exactly one table name in the logical plan, got: %s",
tableNames);
String tableName = tableNames.iterator().next();
// Step-2: Compute routing table assuming all segments are selected. This is to perform the check to reject tables
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid this we will need to make sure in write path all segments for a table go to single server, till we implement multi server in phase2. Can we capture the limitations of phase1 in task list?

private final Map<Long, List<TimeSeries>> _seriesMap;

public TimeSeriesBlock(TimeBuckets timeBuckets, Map<Long, List<TimeSeries>> seriesMap) {
public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map<Long, List<TimeSeries>> seriesMap) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why timebucket is nullable? What will happen if TimeBucket is null? Is this for Instant query use case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is for that, and also for the case when we have to perform partial aggregates. in that case, we might not be able to bucket the values at a time granularity under the combine operator, and may have to instead return time values as a Long[] instead of TimeBuckets in the TimeSeries.

@raghavyadav01
Copy link
Collaborator

@ankitsultana ankitsultana changed the title Part-4: Working E2E Quickstart for Time Series Engine Part-3: Working E2E Quickstart for Time Series Engine Sep 24, 2024
@ankitsultana ankitsultana added the timeseries-engine Tracking tag for generic time-series engine work label Sep 24, 2024
@ankitsultana ankitsultana merged commit c395d09 into apache:master Sep 24, 2024
@ankitsultana
Copy link
Contributor Author

Thanks folks for the quick review. Will be raising smaller PRs going forward for incremental improvements and bug fixes

@Override
public TimeSeriesBlock getNextBlock() {
TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock();
seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we want to use parallel streams here? What is the advantage? Assuming we have a high enough QPS I can only see queries competing to each other to use these limited threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire package will be overwritten in the next few days, and you are right we shouldn't use it.

I have to go through a small review process internally before I can share the actual M3 Plugin implementation which doesn't have any of these hacks.

// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
queryDispatcher = createQueryDispatcher(_brokerConf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the reason was that QueryDispatcher has an in-memory state so we wanted them to be decoupled.

Jackie just merged a PR that creates a separate class for the Time Series dispatcher: #17474

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

timeseries-engine Tracking tag for generic time-series engine work

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants