-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add OOM Protection Support for Multi-Stage Queries #13598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13598 +/- ##
============================================
- Coverage 61.75% 57.81% -3.95%
+ Complexity 207 197 -10
============================================
Files 2436 2587 +151
Lines 133233 142508 +9275
Branches 20636 21888 +1252
============================================
+ Hits 82274 82386 +112
- Misses 44911 53667 +8756
- Partials 6048 6455 +407
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
f8d844a to
fb681c3
Compare
06d32bd to
e4f421a
Compare
jasperjiaguo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still reviewing a few files will finish by this week
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
Outdated
Show resolved
Hide resolved
| if ((size & size - 1) == 0 && size < _maxRowsInHashTable && size < Integer.MAX_VALUE / 2) { // is power of 2 | ||
| hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInHashTable)); | ||
| } | ||
| hashCollection.add(row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we considered the option to do sampleAndCheckInterruptionPeriodically() every X rows in this loop? Similar for some other operators. If the rightBlock size itself is large and the operation itself is mem-heavy (e.g. heap, hashmap add) the OOM may happen within a block if we do not sample at a finer granularity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did look into that. I chose to check for every block for a couple of reasons.
- The block size is ~10000 rows which is very similar to the constant - 8192.
- It was easier to call this function after every block and not worry about specific if-else & try-catch in every implementation.
In this specific case too, the hash table is being built and sample/interruption will run every 10000 rows.
| @@ -0,0 +1,246 @@ | |||
| /** | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to add a test case of query being killed in MSE execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the trheading model is different, I couldnt make those tests work. I'll give it a try again.
015d0bf to
214d5bb
Compare
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
Show resolved
Hide resolved
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
Show resolved
Hide resolved
pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
Show resolved
Hide resolved
...ntime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
Outdated
Show resolved
Hide resolved
...ntime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
Show resolved
Hide resolved
...-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
Outdated
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
Outdated
Show resolved
Hide resolved
214d5bb to
59e3caf
Compare
...query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
Show resolved
Hide resolved
| public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler, | ||
| MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan, | ||
| Map<String, String> opChainMetadata, long requestId, long deadlineMs) { | ||
| Map<String, String> opChainMetadata, long requestId, long deadlineMs, ThreadExecutionContext parentContext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parentContext should be nullable.
nit/suggestion: Can we have a executePipelineBreaker(OpChainSchedulerService scheduler,
MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan,
Map<String, String> opChainMetadata, long requestId, long deadlineMs) that just calls this method with null as last argument? That would reduce this PR by a large margin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change. This was useful in PipelineBreakerTest only. In other usages, a variable is passed through and cannot be removed.
|
Test failure is unrelated. Looks like a flaky test |
track cpu and memory usage in multi-stage queries if query resource usage tracking is enabled
OOM Protection in Pinot depends on accounting of resource usage of threads that execute a query. Resource accounting is currently supported for Single-Stage queries only. This PR adds instrumentation for Multi-Stage query engine as well. OOM protection is automatically available as resource usage is accounted in same data structures checked by the OOM protection thread.
Tracing recognizes two types of threads: runners and workers. Runner submits worker tasks to an executor service. Runner setups a parent context with identification information such as query id. The id information in parent context is used by workers when reporting resource usage.
Query execution in broker and servers is tracked. In the broker, the runner is the same thread that executes
MultiStageRequestBrokerHandler. In the server, the runner is theQueryServer. Workers are setup inOpChainSchedulerService.The following operators have been instrumented. Instrumentation calls
Tracing.sample()to sample resource usage after processing a block from the input operator.AggregateOperatorHashJoinOperatorSetOperatorSortOperatorWindowAggregateOperatorThese operators where chosen because they allocate memory.
MailBoxSendOperatorandMailboxReceiveOperatorhave also been instrumented to sample usage across other operators.Tags: multi-stage, observability