-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[Multi-stage] Optimize query dispatch #12358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Jackie-Jiang
commented
Feb 3, 2024
- Do the server independent serialization only once per stage
- Send one plan per server instead of one plan per stage per server
- Parallel execute the server dependent serialization
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #12358 +/- ##
============================================
- Coverage 61.71% 61.68% -0.04%
+ Complexity 1153 207 -946
============================================
Files 2424 2426 +2
Lines 132512 132674 +162
Branches 20481 20502 +21
============================================
+ Hits 81782 81839 +57
- Misses 44732 44831 +99
- Partials 5998 6004 +6
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
walterddr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me.
| for (QueryServerInstance serverInstance : serverInstances) { | ||
| _executorService.submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that QueryServer could also apply similar technique
pinot/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
Lines 108 to 123 in 0a43986
| try { | |
| distributedStagePlans = QueryPlanSerDeUtils.deserializeStagePlan(request); | |
| } catch (Exception e) { | |
| LOGGER.error("Caught exception while deserializing the request: {}", requestId, e); | |
| responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException()); | |
| return; | |
| } | |
| // 2. Submit distributed stage plans, await response successful or any failure which cancels all other tasks. | |
| int numSubmission = distributedStagePlans.size(); | |
| CompletableFuture<?>[] submissionStubs = new CompletableFuture[numSubmission]; | |
| for (int i = 0; i < numSubmission; i++) { | |
| DistributedStagePlan distributedStagePlan = distributedStagePlans.get(i); | |
| submissionStubs[i] = | |
| CompletableFuture.runAsync(() -> _queryRunner.processQuery(distributedStagePlan, requestMetadata), | |
| _querySubmissionExecutorService); | |
| } |
deserialization (line 109) can be move into the runAsync (line 121)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Will do it as a separate PR