Clamp early aggregation emit to the sort boundary when using partial group ordering#20446
Clamp early aggregation emit to the sort boundary when using partial group ordering#20446alamb merged 2 commits intoapache:mainfrom
Conversation
3ce1b2b to
e3b1045
Compare
e3b1045 to
f5382f4
Compare
alamb
left a comment
There was a problem hiding this comment.
Thank you @jackkleeman
I spent quite a while with this PR this afternoon and I think it looks correct to me
However, I found the current formulation somewhat confusing, and I have offered a potential way to make it clearer (refactoring into a method). I don't think this is required before merge, I can make a follow on PR if you prefer
| if let Some(batch) = self.emit(EmitTo::First(n), false)? { | ||
| // Clamp to the sort boundary when using partial group ordering, | ||
| // otherwise remove_groups panics (#20445). | ||
| let n = match &self.group_ordering { |
There was a problem hiding this comment.
I found this logic quite confusing and the use of 0 as a sentiel also zero. Could we maybe encapsulate it into a method? That way we could at least explain what it is doing better.
I tried a bunch of different forumations, and the best I could come up with was
impl GroupedHashAggregateStream {
...
// Clamp to the sort boundary when using partial group ordering,
// otherwise remove_groups panics (#20445).
if let Some(emit_to) = self.emit_target_for_oom() {
if let Some(batch) = self.emit(EmitTo::First(n), false)?
{
return Ok(Some(ExecutionState::ProducingOutput(batch)))
}
}
...
/// Returns how many groups to try and emit in order to avoid an out-of-memory
/// condition.
///
/// Returns `None` if emitting is not possible.
///
/// Returns Some(EmitTo) with the number of groups to emit if it is possible
/// to emit some groups to free memory
fn emit_target_for_oom(&self) -> Option<EmitTo> {
let n = if self.group_values.len() >= self.batch_size {
// Try to emit an integer multiple of batch size if possible
self.group_values.len() / self.batch_size * self.batch_size
} else {
// Otherwise emit whatever we can
self.group_values.len()
};
// Special case for GroupOrdering::None since emit_to() returns None for
// that case, but we can still emit some groups to try to resolve the OOM
if matches!(&self.group_ordering, GroupOrdering::None) {
return Some(EmitTo::First(n));
};
self.group_ordering.emit_to()
.map(|emit_to| match emit_to {
// If the ordering allows emitting some groups,
// emit as many as we can to try to resolve the OOM,
EmitTo::First(max)=> EmitTo::First(n.min(max)),
// if the ordering allows emitting all groups, we can emit n
// groups to try to resolve the OOM
EmitTo::All => EmitTo::First(n),
})
}
...
}Here is the entire proposed diff
index 35f32ac7a..5bd33aab5 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -1038,26 +1038,50 @@ impl GroupedHashAggregateStream {
// Clamp to the sort boundary when using partial group ordering,
// otherwise remove_groups panics (#20445).
- let n = match &self.group_ordering {
- GroupOrdering::None => n,
- _ => match self.group_ordering.emit_to() {
- Some(EmitTo::First(max)) => n.min(max),
- _ => 0,
- },
- };
-
- if n > 0
- && let Some(batch) = self.emit(EmitTo::First(n), false)?
- {
- Ok(Some(ExecutionState::ProducingOutput(batch)))
- } else {
- Err(oom)
+ if let Some(emit_to) = self.emit_target_for_oom() {
+ if let Some(batch) = self.emit(EmitTo::First(n), false)? {
+ return Ok(Some(ExecutionState::ProducingOutput(batch)));
+ }
}
+
+ Err(oom)
}
_ => Err(oom),
}
}
+ /// Returns how many groups to try and emit in order to avoid an out-of-memory
+ /// condition.
+ ///
+ /// Returns `None` if emitting is not possible.
+ ///
+ /// Returns Some(EmitTo) with the number of groups to emit if it is possible
+ /// to emit some groups to free memory
+ fn emit_target_for_oom(&self) -> Option<EmitTo> {
+ let n = if self.group_values.len() >= self.batch_size {
+ // Try to emit an integer multiple of batch size if possible
+ self.group_values.len() / self.batch_size * self.batch_size
+ } else {
+ // Otherwise emit whatever we can
+ self.group_values.len()
+ };
+
+ // Special case for GroupOrdering::None since emit_to() returns None for
+ // that case, but we can still emit some groups to try to resolve the OOM
+ if matches!(&self.group_ordering, GroupOrdering::None) {
+ return Some(EmitTo::First(n));
+ };
+
+ self.group_ordering.emit_to().map(|emit_to| match emit_to {
+ // If the ordering allows emitting some groups,
+ // emit as many as we can to try to resolve the OOM,
+ EmitTo::First(max) => EmitTo::First(n.min(max)),
+ // if the ordering allows emitting all groups, we can emit n
+ // groups to try to resolve the OOM
+ EmitTo::All => EmitTo::First(n),
+ })
+ }
+| GroupOrdering::None => n, | ||
| _ => match self.group_ordering.emit_to() { | ||
| Some(EmitTo::First(max)) => n.min(max), | ||
| _ => 0, |
There was a problem hiding this comment.
Why not change this to emit n groups on Some(EmitTo::All)?
_ => match self.group_ordering.emit_to() {
Some(EmitTo::First(max)) => n.min(max),
Some(EmitTo::All) => n,
_ => 0,As I understand what it is doing, we are trying to ensure at most n groups are emitted
(this is incorporated above as well)
|
FYI @2010YOUY01 as I think you have also worked on this part of the code as well |
|
@jackkleeman are you ok with merging this PR as is (and I will backport it and propose the cleanup as a follow on PR)? |
|
Ok, in order to get 52.2.0 out, I am going to merge this PR and then make a proposed cleanup as a follow on I'll then backport this PR as is to 52.2.0 |
…group ordering (apache#20446) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#20445. ## What changes are included in this PR? Fix a panic on early emit with partial sort aggregations, by clamping our emit point to the sort boundary ## Are these changes tested? Yes ## Are there any user-facing changes? No
…ing partial group ordering (#20446) (#20558) ## Which issue does this PR close? - part of #20287 - Fixes #20445 on branch-52 ## Rationale for this change See issues ## What changes are included in this PR? - backports #20446 ## Are these changes tested? By CI Co-authored-by: Jack Kleeman <[email protected]>
Which issue does this PR close?
GroupOrderingPartial::remove_groupswhen Partial aggregate with PartiallySorted hits memory pressure #20445.What changes are included in this PR?
Fix a panic on early emit with partial sort aggregations, by clamping our emit point to the sort boundary
Are these changes tested?
Yes
Are there any user-facing changes?
No