feat: ResourceExhausted for memory limit in GroupedHashAggregateStream#4371
feat: ResourceExhausted for memory limit in GroupedHashAggregateStream#4371alamb merged 3 commits intoapache:masterfrom
ResourceExhausted for memory limit in GroupedHashAggregateStream#4371Conversation
| /// includes the allocated size (`capacity`) rather than the current length (`len`) | ||
| pub fn size(&self) -> usize { | ||
| std::mem::size_of_val(&self) | ||
| std::mem::size_of_val(self) |
There was a problem hiding this comment.
The tests panicked due to an integer underflow. Apart that the vector/hashmap calculations were wrong, this here was also kinda tricky: the size of &ScalarValue is 8 bytes, not 48 🤦.
alamb
left a comment
There was a problem hiding this comment.
This looks great to me -- thank you @crepererum
I have some minor style suggestions, but nothing that would prevent this PR from being merged.
For other reviewers, I found the changes easier to see with whitespace blind diff: https://github.com/apache/arrow-datafusion/pull/4371/files?w=1
I'll plan to merge this PR tomorrow unless anyone else wants more time to review
| }) | ||
| }; | ||
|
|
||
| let stream = futures::stream::unfold(inner, |mut this| async move { |
There was a problem hiding this comment.
| }; | ||
| // NOTE: do NOT include the `GroupState` struct size in here because this is captured by | ||
| // `group_states` (see allocation down below) | ||
| allocated += group_state |
There was a problem hiding this comment.
Figuring out how to encapsulate some of this accounting (so it wasn't inlined into the code) would make it easier to maintain I think. But I don't think that is required
There was a problem hiding this comment.
I think as long as the Allocator API in Rust isn't stable, this will be a bit of a mess. Once it is stable, we could have very elegant memory accounting.
| .try_for_each(|(accumulator, values)| match mode { | ||
| AggregateMode::Partial => accumulator.update_batch(&values), | ||
| AggregateMode::Partial => { | ||
| let size_pre = accumulator.size(); |
There was a problem hiding this comment.
You might also consider pulling the size accounting to before/after the match to avoid the duplication
| baseline_metrics, | ||
| )?)) | ||
| } | ||
| self.execute_typed(partition, context) |
|
Thanks again @crepererum |
|
Benchmark runs are scheduled for baseline = fe8aee6 and contender = be1d376. be1d376 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Add size_of_ref lint This addresses #9995, which is likely raising a valid point about `std::mem::size_of_val()`: It's [very easy to use double-references as the argument](apache/datafusion#4371 (comment)), which the function will happily accept and give back the size of _the reference_, not the size of the value _behind_ the reference. In the worst case, if the value matches the programmer's expectation, this seems to work, while in fact, everything will go horribly wrong e.g. on a different platform. The size of a `&T` is independent of what `T` is, and people might want to use `std::mem::size_of_val()` to actually get the size of _any_ reference (e.g. via `&&()`). I would rather suggest that this is always bad behavior, though ([instead](https://doc.rust-lang.org/reference/type-layout.html#pointers-and-references-layout), [and](https://doc.rust-lang.org/stable/std/primitive.usize.html#associatedconstant.BITS)). I, therefore, put this lint into `correctness`. Since the problem is usually easily fixed by removing extra `&`, I went light on suggesting code. --- changelog: New lint: [`size_of_ref`] [#10098](#10098) <!-- changelog_checked -->
Which issue does this PR close?
For #3940.
Update: This does NOT close the issue. I forgot the no-group version (
AggregateStream). Will do that in a follow-up PR. It's a rather simple change though.Rationale for this change
Ensure that users don't run out of memory while performing group-by operations. This is esp. important for servers or multi-tenant systems.
What changes are included in this PR?
This is similar to #4202. It includes an additional type
StreamTypeso we can double-check our test setup (namely: is the stream that we request actually the stream version we want).Are these changes tested?
Extended
test_oom. Also here are the perf results:I think the regressions (<3%) are within the safety margin of such a crucial feature (and also within what a laptop could reliable reproduce).
Are there any user-facing changes?
The V1 group-by op an now emit a
ResourceExhaustederror if it runs out of memory. Note that the error is kinda nested/wrapped due to #4172.