Support for median(distinct) aggregation function#10226
Conversation
| /// MEDIAN(DISTINCT) aggregate expression. Similar to MEDIAN but computes after taking | ||
| /// all unique values. This may use a lot of memory if the cardinality is high. | ||
| #[derive(Debug)] | ||
| pub struct DistinctMedian { |
There was a problem hiding this comment.
Lot of this code is duplicated with above median, but figured that since it's only duplicated once it should be fine? Or can macro it I suppose. Also colocated within existing median.rs file for this reason, to make it clear how similar they are.
There was a problem hiding this comment.
The main difference seems to be the Accumulator implementation
What do you think about adding a field on Median like distinct
pub struct DistinctMedian {
...
distinct: bool
}And then instantiating the correct accumulator increate_accumulator ? That would add an additional check when creating an accumulator but that seems inconsequential compared to the work to actually allocate and compute the median
There was a problem hiding this comment.
Sounds good, implemented 👍
| macro_rules! helper { | ||
| ($t:ty, $dt:expr) => { | ||
| Ok(Box::new(DistinctMedianAccumulator::<$t> { | ||
| data_type: $dt.clone(), | ||
| distinct_values: Default::default(), | ||
| })) | ||
| }; | ||
| } |
There was a problem hiding this comment.
Could we rename this macro to a more specific name?
There was a problem hiding this comment.
I think it follows the name used in Median
There was a problem hiding this comment.
Removed this bit of duplication by folding distinct into Median
alamb
left a comment
There was a problem hiding this comment.
Closes #2411
Wow -- another awesomely low ticket being fixed 👏
Thank you @Jefffrey and @Weijun-H -- I left some comments about how to potentially improve this PR but I think overall it looks really nice and could be merged a as is
I do think it would be nice to avoid duplicating DistinctMedian if possible, though I can't come up with a better way for the accumulator.
I think changing some of the tests so the input data wasn't sorted would also improve this PR.
| /// MEDIAN(DISTINCT) aggregate expression. Similar to MEDIAN but computes after taking | ||
| /// all unique values. This may use a lot of memory if the cardinality is high. | ||
| #[derive(Debug)] | ||
| pub struct DistinctMedian { |
There was a problem hiding this comment.
The main difference seems to be the Accumulator implementation
What do you think about adding a field on Median like distinct
pub struct DistinctMedian {
...
distinct: bool
}And then instantiating the correct accumulator increate_accumulator ? That would add an additional check when creating an accumulator but that seems inconsequential compared to the work to actually allocate and compute the median
| macro_rules! helper { | ||
| ($t:ty, $dt:expr) => { | ||
| Ok(Box::new(DistinctMedianAccumulator::<$t> { | ||
| data_type: $dt.clone(), | ||
| distinct_values: Default::default(), | ||
| })) | ||
| }; | ||
| } |
There was a problem hiding this comment.
I think it follows the name used in Median
| /// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values | ||
| /// in the final evaluation step so that we avoid expensive conversions and | ||
| /// allocations during `update_batch`. | ||
| struct DistinctMedianAccumulator<T: ArrowNumericType> { |
There was a problem hiding this comment.
I started playing around with trying to make a generic trait that could handle both Vec and HashSet. I couldn't make the types work out and I convinced myself it would end up being at least as much code as having the replication across accumulators. Thus I think having a copy/paste/modify version of DistinctMedianAccumulator is fine
/// A trait for a container of numeric types that can be compared
/// A `Vec` is used for Median and `HashSet` for DistinctMedian
trait MedianValues: Send + Sync + std::fmt::Debug {
type T: ArrowNativeType;
fn reserve(&mut self, additional: usize);
fn extend(&mut self, values: impl Iterator<Item = Self::T>);
fn into_iter(self) -> Box<dyn Iterator<Item = Self::T>>;
/// Convert the elements of this container into a ListArray
fn into_list_array(self) -> ListArray;
}
impl <T:ArrowNativeType> MedianValues for Vec<T> {
type T = T;
fn reserve(&mut self, additional: usize) {
todo!()
}
fn extend(&mut self, values: impl Iterator<Item=Self::T>) {
todo!()
}
fn into_iter(self) -> Box<dyn Iterator<Item=Self::T>> {
todo!()
}
fn into_list_array(self) -> ListArray {
todo!()
}
}
/// The median accumulator accumulates the raw input values
/// as `ScalarValue`s
///
/// The intermediate state is represented as a List of scalar values updated by
/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
/// in the final evaluation step so that we avoid expensive conversions and
/// allocations during `update_batch`.
struct MedianAccumulator<T: ArrowNumericType, V: MedianValues<T = T>> {
data_type: DataType,
all_values: V,
}I couldn't quite make this work -- it errors like this
error[E0271]: type mismatch resolving `<Vec<i8> as MedianValues>::T == Int8Type`
--> datafusion/physical-expr/src/aggregate/median.rs:76:33
|
76 | all_values: vec![],
| ^^^^^^ type mismatch resolving `<Vec<i8> as MedianValues>::T == Int8Type`
...
81 | / downcast_integer! {
82 | | dt => (helper, dt),
83 | | DataType::Float16 => helper!(Float16Type, dt),
84 | | DataType::Float32 => helper!(Float32Type, dt),
... |
92 | | ))),
Here is the full diff if anyone wants to play around
Details
diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs
index 1049187a5..0e9b0b87d 100644
--- a/datafusion/physical-expr/src/aggregate/median.rs
+++ b/datafusion/physical-expr/src/aggregate/median.rs
@@ -23,7 +23,7 @@ use crate::{AggregateExpr, PhysicalExpr};
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
-use arrow_array::{downcast_integer, ArrowNativeTypeOp, ArrowNumericType};
+use arrow_array::{downcast_integer, ArrowNativeTypeOp, ArrowNumericType, ListArray};
use arrow_buffer::ArrowNativeType;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
@@ -71,7 +71,7 @@ impl AggregateExpr for Median {
use arrow_array::types::*;
macro_rules! helper {
($t:ty, $dt:expr) => {
- Ok(Box::new(MedianAccumulator::<$t> {
+ Ok(Box::new(MedianAccumulator::<$t, Vec<<$t as ArrowPrimitiveType>::Native>> {
data_type: $dt.clone(),
all_values: vec![],
}))
@@ -127,6 +127,39 @@ impl PartialEq<dyn Any> for Median {
}
}
+/// A trait for a container of numeric types that can be compared
+/// A `Vec` is used for Median and `HashSet` for DistinctMedian
+trait MedianValues: Send + Sync + std::fmt::Debug {
+ type T: ArrowNativeType;
+
+ fn reserve(&mut self, additional: usize);
+ fn extend(&mut self, values: impl Iterator<Item = Self::T>);
+ fn into_iter(self) -> Box<dyn Iterator<Item = Self::T>>;
+ /// Convert the elements of this container into a ListArray
+ fn into_list_array(self) -> ListArray;
+}
+
+impl <T:ArrowNativeType> MedianValues for Vec<T> {
+ type T = T;
+
+ fn reserve(&mut self, additional: usize) {
+ todo!()
+ }
+
+ fn extend(&mut self, values: impl Iterator<Item=Self::T>) {
+ todo!()
+ }
+
+ fn into_iter(self) -> Box<dyn Iterator<Item=Self::T>> {
+ todo!()
+ }
+
+ fn into_list_array(self) -> ListArray {
+ todo!()
+ }
+}
+
+
/// The median accumulator accumulates the raw input values
/// as `ScalarValue`s
///
@@ -134,18 +167,18 @@ impl PartialEq<dyn Any> for Median {
/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
/// in the final evaluation step so that we avoid expensive conversions and
/// allocations during `update_batch`.
-struct MedianAccumulator<T: ArrowNumericType> {
+struct MedianAccumulator<T: ArrowNumericType, V: MedianValues<T = T>> {
data_type: DataType,
- all_values: Vec<T::Native>,
+ all_values: V,
}
-impl<T: ArrowNumericType> std::fmt::Debug for MedianAccumulator<T> {
+impl<T: ArrowNumericType, V: MedianValues<T = T>> std::fmt::Debug for MedianAccumulator<T, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "MedianAccumulator({})", self.data_type)
}
}
-impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
+impl<T: ArrowNumericType, V: MedianValues<T = T>> Accumulator for MedianAccumulator<T, V> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let all_values = self
.all_values| #[test] | ||
| fn distinct_median_decimal_with_nulls() -> Result<()> { | ||
| let array: ArrayRef = Arc::new( | ||
| vec![Some(1), Some(2), None, Some(3), Some(3), Some(3), Some(3)] |
There was a problem hiding this comment.
I recommend adding values in non sorted order in these tests to make sure there is nothing related to sorting going on
There was a problem hiding this comment.
Shuffled some of the tests
| } | ||
|
|
||
| let array = values[0].as_primitive::<T>(); | ||
| match array.nulls().filter(|x| x.null_count() > 0) { |
There was a problem hiding this comment.
Another way to check this I think that might be clearer is array.null_count() https://docs.rs/arrow/latest/arrow/array/trait.Array.html#method.null_count
There was a problem hiding this comment.
I actually just ripped this from sum_distinct:
datafusion/datafusion/physical-expr/src/aggregate/sum_distinct.rs
Lines 168 to 178 in 9c8873a
I guess its a way to avoid iterating over options since can use the inner null buffer to get valid indices, but not sure how much a performance difference it would make 🤷
There was a problem hiding this comment.
I think it is a common optimization (in the arrow-rs kernels and datafusion) to special case the 'no nulls' case -- if you know there are no nulls in the input you can avoid a branch (to check for null) in the inner loop, which gives the compiler a better chance for auto-vectorization
There was a problem hiding this comment.
My point in #10226 (comment) was that the way this is checking for no-nulls seems overly obscure to me
I think the code could look like
if array.null_count() > 0 {
for val in array.iter() {
if let Some(value) = val {
self.distinct_values.insert(Hashable(value))
}
} else {
array.values().iter().for_each(|x| {
self.distinct_values.insert(Hashable(*x));
}),
}But I also don't think it is a big deal
|
Thanks a lot @Jefffrey -- looks great ❤️ |
Which issue does this PR close?
Closes #2411
Rationale for this change
Support for explicit distinct median aggregation function, allowing queries such as below to execute now:
What changes are included in this PR?
Essentially duplicated the existing Median aggregation function but aggregate into HashSet instead of a Vec.
Chose HashSet over BTreeMap as figured the insert path is hotter than the fetch path (which is essentially once at evaluation), and (limited) benchmarking didn't show too significant a difference between the two.
Are these changes tested?
Yes, tests added
Are there any user-facing changes?