Skip to content

Conversation

@andimiller
Copy link
Contributor

This adds support for BYTES columns containing Tuple Sketches with Integer as the summary type.

The added classes currently support Sum as the semigroup, but are generic so others can be added.

Feature breakdown:

  1. Add transform functions that can be used to create Integer Tuple Sketches during ingestion, eg. toIntegerSumTupleSketch(colA, colbB, 16)
  2. Add Codecs that use the Datasketches serialization
  3. Add aggregation functions:
  • DISTINCT_COUNT_TUPLE_SKETCH will just get the estimate for the number of unique keys, same as Theta or HLL
  • DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH will merge the sketches using Sum as the semigroup and return the raw sketch
  • SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH will merge the sketches using Sum as the semigroup and estimate the sum of the value side
  • AVG_VALUES_INTEGER_SUM_TUPLE_SKETCH will merge the sketches using Sum as the semigroup and estimate the average of the value side
  1. Add ValueAggregator<_, _>s for use in StarTree indexes for all 4 above aggregations
  2. Add ValueAggregators for use in rollups for all 4 above aggregations

This adds support for `BYTES` columns containing Tuple Sketches with Integer as the summary type.

The added classes currently support `Sum` as the semigroup, but are generic so others can be added.

Feature breakdown:

1. Add transform functions that can be used to create Integer Tuple Sketches during ingestion, eg. `toIntegerSumTupleSketch(colA, colbB, 16)`
2. Add Codecs that use the Datasketches serialization
3. Add aggregation functions:
  * `DISTINCT_COUNT_TUPLE_SKETCH` will just get the estimate for the number of unique keys, same as Theta or HLL
  * `DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and return the raw sketch
  * `SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the sum of the value side
  * `AVG_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the average of the value side
4. Add `ValueAggregator<_, _>`s for use in `StarTree` indexes for all 4 above aggregations
5. Add `ValueAggregator`s for use in rollups for all 4 above aggregations
@andimiller
Copy link
Contributor Author

I could do with some advice on the best place to add tests for the aggregation functions, I've been looking through the existing tests and can't find anywhere suitable

@andimiller andimiller changed the title Tuple sketch support Integer Tuple Sketch support Mar 15, 2023
@codecov-commenter
Copy link

codecov-commenter commented Mar 15, 2023

Codecov Report

❌ Patch coverage is 1.82648% with 215 lines in your changes missing coverage. Please review.
✅ Project coverage is 32.05%. Comparing base (00d3133) to head (1b7fe74).
⚠️ Report is 3699 commits behind head on master.

Files with missing lines Patch % Lines
...unction/IntegerTupleSketchAggregationFunction.java 0.00% 88 Missing ⚠️
...he/pinot/core/function/scalar/SketchFunctions.java 0.00% 35 Missing ⚠️
.../aggregator/IntegerTupleSketchValueAggregator.java 0.00% 21 Missing ⚠️
...AvgValueIntegerTupleSketchAggregationFunction.java 0.00% 17 Missing ⚠️
...umValuesIntegerTupleSketchAggregationFunction.java 0.00% 15 Missing ⚠️
...org/apache/pinot/core/common/ObjectSerDeUtils.java 36.36% 7 Missing ⚠️
...nctCountIntegerTupleSketchAggregationFunction.java 0.00% 7 Missing ⚠️
...ssing/aggregator/IntegerTupleSketchAggregator.java 0.00% 7 Missing ⚠️
...he/pinot/segment/local/utils/CustomSerDeUtils.java 0.00% 7 Missing ⚠️
...gregation/function/AggregationFunctionFactory.java 0.00% 4 Missing ⚠️
... and 3 more

❗ There is a different number of reports uploaded between BASE (00d3133) and HEAD (1b7fe74). Click for more details.

HEAD has 2 uploads less than BASE
Flag BASE (00d3133) HEAD (1b7fe74)
integration1 1 0
unittests1 1 0
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #10427       +/-   ##
=============================================
- Coverage     70.30%   32.05%   -38.26%     
+ Complexity     6494      462     -6032     
=============================================
  Files          2158     2164        +6     
  Lines        116070   116274      +204     
  Branches      17566    17592       +26     
=============================================
- Hits          81608    37268    -44340     
- Misses        28778    75698    +46920     
+ Partials       5684     3308     -2376     
Flag Coverage Δ
integration1 ?
integration2 23.68% <1.82%> (-0.10%) ⬇️
unittests1 ?
unittests2 13.64% <0.00%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.


@Override
public byte[] serializeAggregatedValue(Sketch<IntegerSummary> value) {
return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious to know if there's a reason why we have 2 ser/deser utilities (CustomSerDeUtils, ObjectSerDeUtils) ? @Jackie-Jiang

Copy link
Member

@davecromberge davecromberge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good @andimiller!

} else if (value instanceof PinotFourthMoment) {
return ObjectType.PinotFourthMoment;
} else if (value instanceof org.apache.datasketches.tuple.Sketch) {
return ObjectType.IntegerTupleSketch;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a safe assumption? Is it also necessary to inspect the summary type to verify integer?

Copy link
Contributor Author

@andimiller andimiller Mar 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now it is, but to add other types of tuple Sketch we'd need to add wrapper types, due to JVM type erasure

}
double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();
return Double.valueOf(estimate).longValue();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the serde always deserialise bytes to a compact sketch? It could be better to use the base Sketch abstraction for cases where the sketches have been created outside the system and not compacted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can give that a go, I swapped it to all compact because I was having issues with the non-threadsafe nature of Sketch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same question as well :).

public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;


public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any references that can help explain this value?

Copy link
Contributor Author

@andimiller andimiller Mar 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment, it's the same as the theta one above, but log 2

is.update((String) key, value);
} else if (key instanceof byte[]) {
is.update((byte[]) key, value);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you want to validate/catch invalid types, consider throwing an IllegalStateException/illegalArg exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, added it for theta too and expanded the tests to cover

import org.apache.pinot.spi.data.FieldSpec.DataType;


public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the raw type (R) be Sketch, instead of byte[] here ? Looking at the other sketch implementation (DistinctCountThetaSketchValueAggregator), which has Object as the raw type, I just wanted to check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be but Sketch isn't thread-safe, and I swapped this to byte[] while hunting down some thread safety issues, I will see if I can swap it back now that I've made all the Union use thread safe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may need to be Object, this was a good catch, doing more local testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have tested it more locally, it is fine being byte[] because we only handle aggregated sketches

return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
case DISTINCTCOUNTTUPLESKETCH:
// mode actually doesn't matter here because we only care about keys, not values
return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why we pass IntegerSummary.Mode.Sum as a parameter ? We are already differentiating based on the aggregation implementations IntegerTupleSketchAggregationFunction vs AvgIntegerTupleSketchAggregationFunction vs SumValuesIntegerTupleSketchAggregationFunction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is the mode for IntegerSummary merging, all of these use Sum

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so there can be functions that can use other summary modes (min, max..) in the future.

import org.apache.pinot.segment.spi.AggregationFunctionType;


public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would composition + delegation make the APIs for Sum, Avg, distinct clearer than inheritance ? That way we know when/how IntegerTupleSketchAggregationFunction is exactly used and it'll decouple the Integer API from the rest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've followed the way it was implemented for Theta, using the simplest one as the base and inheriting it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense to keep them consistent.

}
ArrayList<CompactSketch<IntegerSummary>> merged =
new ArrayList<>(intermediateResult1.size() + intermediateResult2.size());
merged.addAll(intermediateResult1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - We dont want to do a union here for the merge? Im looking at DistinctCountThetaSketchAggregationFunction for reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an optimisation similar to the one used in the Theta version, where merges can be quite expensive, and it's better to delay the merge til we have a lot of sketches to combine, hence using List as the intermediate type

}
}
} catch (Exception e) {
throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is groupBy and not merging tuple sketches ?

byte[] value = valueArray[i];
CompactSketch<IntegerSummary> newSketch =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
for (int groupKey : groupKeysArray[i]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks exactly the same as aggregateGroupBySV except that we iterate over group keys as it can be multivalued ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

import org.apache.pinot.segment.spi.AggregationFunctionType;


public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense to keep them consistent.

}
double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();
return Double.valueOf(estimate).longValue();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same question as well :).

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Can you please rebase and resolve the conflict, and also respond to the pending comments?

@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes labels Apr 26, 2023
@swaminathanmanish
Copy link
Contributor

Thanks for taking care of comments !

@mayankshriv mayankshriv merged commit ded7e8f into apache:master May 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants