-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Integer Tuple Sketch support #10427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integer Tuple Sketch support #10427
Conversation
This adds support for `BYTES` columns containing Tuple Sketches with Integer as the summary type. The added classes currently support `Sum` as the semigroup, but are generic so others can be added. Feature breakdown: 1. Add transform functions that can be used to create Integer Tuple Sketches during ingestion, eg. `toIntegerSumTupleSketch(colA, colbB, 16)` 2. Add Codecs that use the Datasketches serialization 3. Add aggregation functions: * `DISTINCT_COUNT_TUPLE_SKETCH` will just get the estimate for the number of unique keys, same as Theta or HLL * `DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and return the raw sketch * `SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the sum of the value side * `AVG_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the average of the value side 4. Add `ValueAggregator<_, _>`s for use in `StarTree` indexes for all 4 above aggregations 5. Add `ValueAggregator`s for use in rollups for all 4 above aggregations
|
I could do with some advice on the best place to add tests for the aggregation functions, I've been looking through the existing tests and can't find anywhere suitable |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #10427 +/- ##
=============================================
- Coverage 70.30% 32.05% -38.26%
+ Complexity 6494 462 -6032
=============================================
Files 2158 2164 +6
Lines 116070 116274 +204
Branches 17566 17592 +26
=============================================
- Hits 81608 37268 -44340
- Misses 28778 75698 +46920
+ Partials 5684 3308 -2376
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public byte[] serializeAggregatedValue(Sketch<IntegerSummary> value) { | ||
| return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious to know if there's a reason why we have 2 ser/deser utilities (CustomSerDeUtils, ObjectSerDeUtils) ? @Jackie-Jiang
...nt-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
Show resolved
Hide resolved
davecromberge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good @andimiller!
| } else if (value instanceof PinotFourthMoment) { | ||
| return ObjectType.PinotFourthMoment; | ||
| } else if (value instanceof org.apache.datasketches.tuple.Sketch) { | ||
| return ObjectType.IntegerTupleSketch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a safe assumption? Is it also necessary to inspect the summary type to verify integer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now it is, but to add other types of tuple Sketch we'd need to add wrapper types, due to JVM type erasure
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
Show resolved
Hide resolved
...c/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
Outdated
Show resolved
Hide resolved
| } | ||
| double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate(); | ||
| return Double.valueOf(estimate).longValue(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the serde always deserialise bytes to a compact sketch? It could be better to use the base Sketch abstraction for cases where the sketches have been created outside the system and not compacted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can give that a go, I swapped it to all compact because I was having issues with the non-threadsafe nature of Sketch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same question as well :).
...he/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
Outdated
Show resolved
Hide resolved
...g/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java
Outdated
Show resolved
Hide resolved
...g/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java
Outdated
Show resolved
Hide resolved
.../org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
Outdated
Show resolved
Hide resolved
...nt-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
Show resolved
Hide resolved
| public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536; | ||
|
|
||
|
|
||
| public static final int DEFAULT_TUPLE_SKETCH_LGK = 16; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any references that can help explain this value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a comment, it's the same as the theta one above, but log 2
| is.update((String) key, value); | ||
| } else if (key instanceof byte[]) { | ||
| is.update((byte[]) key, value); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case you want to validate/catch invalid types, consider throwing an IllegalStateException/illegalArg exception ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, added it for theta too and expanded the tests to cover
| import org.apache.pinot.spi.data.FieldSpec.DataType; | ||
|
|
||
|
|
||
| public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the raw type (R) be Sketch, instead of byte[] here ? Looking at the other sketch implementation (DistinctCountThetaSketchValueAggregator), which has Object as the raw type, I just wanted to check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be but Sketch isn't thread-safe, and I swapped this to byte[] while hunting down some thread safety issues, I will see if I can swap it back now that I've made all the Union use thread safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may need to be Object, this was a good catch, doing more local testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have tested it more locally, it is fine being byte[] because we only handle aggregated sketches
| return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT); | ||
| case DISTINCTCOUNTTUPLESKETCH: | ||
| // mode actually doesn't matter here because we only care about keys, not values | ||
| return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason why we pass IntegerSummary.Mode.Sum as a parameter ? We are already differentiating based on the aggregation implementations IntegerTupleSketchAggregationFunction vs AvgIntegerTupleSketchAggregationFunction vs SumValuesIntegerTupleSketchAggregationFunction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is the mode for IntegerSummary merging, all of these use Sum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so there can be functions that can use other summary modes (min, max..) in the future.
.../org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
Outdated
Show resolved
Hide resolved
| import org.apache.pinot.segment.spi.AggregationFunctionType; | ||
|
|
||
|
|
||
| public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would composition + delegation make the APIs for Sum, Avg, distinct clearer than inheritance ? That way we know when/how IntegerTupleSketchAggregationFunction is exactly used and it'll decouple the Integer API from the rest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've followed the way it was implemented for Theta, using the simplest one as the base and inheriting it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes makes sense to keep them consistent.
...che/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
Show resolved
Hide resolved
.../org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
Show resolved
Hide resolved
| } | ||
| ArrayList<CompactSketch<IntegerSummary>> merged = | ||
| new ArrayList<>(intermediateResult1.size() + intermediateResult2.size()); | ||
| merged.addAll(intermediateResult1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious - We dont want to do a union here for the merge? Im looking at DistinctCountThetaSketchAggregationFunction for reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an optimisation similar to the one used in the Theta version, where merges can be quite expensive, and it's better to delay the merge til we have a lot of sketches to combine, hence using List as the intermediate type
| } | ||
| } | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Caught exception while merging Tuple Sketches", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is groupBy and not merging tuple sketches ?
| byte[] value = valueArray[i]; | ||
| CompactSketch<IntegerSummary> newSketch = | ||
| ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact(); | ||
| for (int groupKey : groupKeysArray[i]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks exactly the same as aggregateGroupBySV except that we iterate over group keys as it can be multivalued ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
| import org.apache.pinot.segment.spi.AggregationFunctionType; | ||
|
|
||
|
|
||
| public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes makes sense to keep them consistent.
| } | ||
| double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate(); | ||
| return Double.valueOf(estimate).longValue(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same question as well :).
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Can you please rebase and resolve the conflict, and also respond to the pending comments?
|
Thanks for taking care of comments ! |
This adds support for
BYTEScolumns containing Tuple Sketches with Integer as the summary type.The added classes currently support
Sumas the semigroup, but are generic so others can be added.Feature breakdown:
toIntegerSumTupleSketch(colA, colbB, 16)DISTINCT_COUNT_TUPLE_SKETCHwill just get the estimate for the number of unique keys, same as Theta or HLLDISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCHwill merge the sketches usingSumas the semigroup and return the raw sketchSUM_VALUES_INTEGER_SUM_TUPLE_SKETCHwill merge the sketches usingSumas the semigroup and estimate the sum of the value sideAVG_VALUES_INTEGER_SUM_TUPLE_SKETCHwill merge the sketches usingSumas the semigroup and estimate the average of the value sideValueAggregator<_, _>s for use inStarTreeindexes for all 4 above aggregationsValueAggregators for use in rollups for all 4 above aggregations