Skip to content

Conversation

@ankitsultana
Copy link
Contributor

Summary

Completes the first E2E working version of the new Physical Optimizer. There's quite a bit of refactoring and feature porting that still needs to be done in order to make this usable generally, and I am hoping to spend May doing that.

Maintaining Request Id in Context

The current QueryEnvironment I feel is quite bloated and I know several folks have also raised this. For now I have added the requestId to QueryEnvironment#Config. This eliminates passing the requestId across method calls like toDispatchableSubPlan so I think it is overall still a relatively clean approach.

Determining When Physical Optimizer is Enabled

I have added a new query option, that is temporary: usePhysicalOptimizer=true/false. By default this is assumed false. When we use the physical optimizer, we need to skip certain sections of the HepProgram and I have made those changes accordingly.

Plan Fragmenter and Mailbox Assignment

This does the following:

  1. Converts PRelNode tree to PlanNode tree.
  2. Creates plan fragments
  3. Assigns Mailbox and sets them in the Dispatchable Plan Metadata

This is not unit tested right now and I am working on a follow-up PR to add unit tests for this and some other parts of the code.

PinotLogicalQueryPlanner changes

I think we really need to clean up some of these classes. e.g. PinotLogicalQueryPlanner creates the dispatchable plan which is misleading.

I hope to do this incrementally. First, I'll backport the missing features from the existing MSE Optimizer, then work on adding sufficient testing to make sure the new optimizer can be made the main optimizer, and then work on cleaning up the rest of the optimizer structure and removing the old optimizer code.

My hope is to wrap all of this up in H1.

Test Plan

Unit Tests

Have added Unit Test cases which have decent coverage. Check the JSON Plan output file.

Cluster Testing

We have tested this in our cluster and even for some of the simple query shapes on low amount of data, the perf improvement is around 2x, but that may be solely because we don't yet support semi-join dynamic filters in the Physical Optimizer. We'll be running more benchmarks this month to compare the difference between the old and the new optimizer on one of our bigger clusters.

@ankitsultana ankitsultana added multi-stage Related to the multi-stage query engine mse-physical-optimizer labels May 2, 2025
@codecov-commenter
Copy link

codecov-commenter commented May 2, 2025

Codecov Report

Attention: Patch coverage is 88.62559% with 24 lines in your changes missing coverage. Please review.

Project coverage is 63.18%. Comparing base (1a476de) to head (b01fb52).
Report is 22 commits behind head on master.

Files with missing lines Patch % Lines
.../physical/v2/PlanFragmentAndMailboxAssignment.java 90.41% 5 Missing and 9 partials ⚠️
.../java/org/apache/pinot/query/QueryEnvironment.java 84.61% 2 Missing and 4 partials ⚠️
...requesthandler/MultiStageBrokerRequestHandler.java 0.00% 2 Missing ⚠️
...he/pinot/query/context/PhysicalPlannerContext.java 33.33% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15698      +/-   ##
============================================
+ Coverage     62.90%   63.18%   +0.28%     
- Complexity     1386     1388       +2     
============================================
  Files          2867     2873       +6     
  Lines        163354   164212     +858     
  Branches      24952    25124     +172     
============================================
+ Hits         102755   103764    +1009     
+ Misses        52847    52605     -242     
- Partials       7752     7843      +91     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.16% <88.62%> (+0.29%) ⬆️
java-21 63.16% <88.62%> (+0.34%) ⬆️
skip-bytebuffers-false ?
skip-bytebuffers-true ?
temurin 63.18% <88.62%> (+0.28%) ⬆️
unittests 63.18% <88.62%> (+0.28%) ⬆️
unittests1 56.38% <89.47%> (+0.56%) ⬆️
unittests2 33.32% <5.68%> (-0.25%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@siddharthteotia
Copy link
Contributor

Do we have data points on overhead of optimizer itself ?

@ankitsultana
Copy link
Contributor Author

Not yet, but I hope to run it at some point. FWIW I don't expect this new optimizer to add any new overheads of its own.

Startree folks have seen some bottlenecks from how Calcite handles some things like large IN list, but we haven't yet tested MSE for such use cases yet.

@siddharthteotia
Copy link
Contributor

siddharthteotia commented May 3, 2025

Apologies if these questions have been discussed earlier. In that case, you can just point to the discussion or doc.

I have a couple of questions out of curiosity --

  • Are we primarily trying to optimize the JOIN ordering or broad set of queries ?

  • Have we looked at LOptJoinOptimizeRule in Calcite ?

  • I don't see a cost factory. We also don't have a stats store. So is this rule based or cost based ?

FWIW I don't expect this new optimizer to add any new overheads of its own.

I think it really depends on the query complexity. In my experience, sophisticated query planning can easily go upto 100s of ms or even upto 1sec but then then it could be worth it if still optimizes the execution.

I am very interested to know what are the kinds of queries for which you are seeing performance boost with the physical optimizer.

that is temporary: usePhysicalOptimizer=true/false. By default this is assumed false. When we use the physical optimizer, we need to skip certain sections of the HepProgram and I have made those changes

Why is this needed? I am hoping that even in the first working version of the optimizer, there is no query option otherwise we are just going to create a lot of tech debt / conditional paths where people will inevitably bypass it and create their own niche optimizations in the OSS code.

Overall I think this is a step in the right direction. It will make Pinot better and help make it feature complete. However, optimizer is a long journey (longer than execution) with lots of consistent non-trivial optimizations/revisions. I suggest we invest systematically with a long term direction which I am not sure what that is. If there is a doc, I would love to see it.

@ankitsultana
Copy link
Contributor Author

Hey Siddharth, this is the same workstream I had discussed with you in our 1:1 a few months ago, and the design doc is here: https://docs.google.com/document/d/17ApZbvNphKgEdSAOlZwTwAnnL_dAt9QbjcpzjHb4M0w/edit?tab=t.0#heading=h.txhamf1ek7k

The overall goal is to restructure the query optimizer to improve Exchange Simplification and make it possible to add the MSE Lite Mode I had proposed here: #14640

Adding a cost model and doing join reordering is a separate topic and this effort doesn't impede that. We might actually contribute the same ourselves in a few months.

I am very interested to know what are the kinds of queries for which you are seeing performance boost with the physical optimizer.

Key optimizations that this brings are (there are more.. I'll highlight them in the doc once we can run more tests on our clusters):

  1. Full query plans for eligible queries can be run without any cross worker exchange, regardless of the number of joins involved in the query. (see examples below)
  2. Tables involved in a join will no longer need to have the same number of partitions. E.g. you can have a fact table with 256 partitions and a smaller table with 16 partitions, and we'll still be able to avoid cross worker exchange whenever possible.
  3. For our internal use-cases, we see anywhere between 2-5x improvement for query performance. I'll try to share detailed results once I propose removing the query option and the old query optimizer path (ETA: June).
  4. All of the above optimizations will run without any requirement for hints.

Some examples:

-- this entire query will run without any cross worker data transfer.
SELECT deviceOS, userUUID FROM userAttributes
  WHERE userUUID IN (SELECT userUUID FROM userGroups WHERE groupUUID = 'group-1')
  AND userUUID NOT IN (SELECT userUUID FROM userGroups WHERE groupUUID = 'group-2')
  ... any number of IN/Not-In clauses
-- this query will also run without any shuffles, except the last Singleton exchange for computing COUNT(*)
WITH tmp AS (
  SELECT userUUID
  FROM userAttributes
    WHERE deviceOS = ''
  UNION ALL
  SELECT userUUID
  FROM userAttributes
  WHERE deviceOS IN ('ios')
)
SELECT COUNT(*)
FROM userAttributes
WHERE userUUID IN (
	SELECT userUUID FROM userAttributes WHERE deviceOS IN ('ios')
  )
  AND userUUID NOT IN (
	SELECT userUUID FROM userAttributes WHERE deviceOS IN ('foo')
  )
  ... any number of joins on the partitioning key
  AND userUUID IN (
    SELECT userUUID FROM tmp
  )

Why is this needed? I am hoping that even in the first working version of the optimizer, there is no query option otherwise we are just going to create a lot of tech debt / conditional paths where people will inevitably bypass it and create their own niche optimizations in the OSS code.

Completely agree. The reason this is needed is because the existing query optimizer has a lot of features that we need to port to the new optimizer flow. The idea is that we'll soon backport all features of the existing query optimizer, and then remove the old code path. I also want to give other companies time to test the new optimizer path out and run their own A/B tests in case they want to test the perf difference that the new optimizer brings to their workloads.

@gortiz
Copy link
Contributor

gortiz commented May 5, 2025

The current QueryEnvironment I feel is quite bloated and I know several folks have also raised this. For now I have added the requestId to QueryEnvironment#Config. This eliminates passing the requestId across method calls like toDispatchableSubPlan so I think it is overall still a relatively clean approach.

You don't need to do that. Instead, you can rely on QueryThreadContext.getRequestId(), which should be set for all threads running this code in both servers and brokers. Controllers calling QueryEnvironment (ie to obtain the list of tables being used) may need to initiate the QueryThreadContext.

Remember we also introduced QueryThreadContext.getCid(), which, contrary to requestId, is stable once set. I didn't remove old usages of request id because I didn't want to make refactors more complex, but if you have to do it now, it is probably better to use QueryThreadContext.

@ankitsultana
Copy link
Contributor Author

@gortiz : thanks for sharing. I'll raise another one after this to address this. As for the cleanup, I wanted to make a change which explicitly tracks whether the QueryEnvironment usage is in the controller or the broker. Right now I think we implicitly infer this by checking whether envConfig.getWorkerManager() is non-null. Lmk if that assumption is correct.

Ideally I'd like to take a stab at the TODO shown below but I think it's best to do it once I mainline the physical optimizer.

image

Copy link
Contributor

@itschrispeck itschrispeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good from my side given the execution plan shared

"sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo' LIMIT 10, 11",
"output": [
"Execution Plan",
"\nPhysicalSort(offset=[10], fetch=[11])",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For unsorted queries, we can consider fetch directly and not do offsets since a high offset value can result in high memory usage for high offset values but low limit values.

Copy link
Collaborator

@shauryachats shauryachats left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ankitsultana ankitsultana merged commit a231dbb into apache:master May 5, 2025
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

mse-physical-optimizer multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants