-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[multistage] Initial (phase 1) Query runtime for window functions - empty OVER() and OVER(PARTITION BY) #10286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[multistage] Initial (phase 1) Query runtime for window functions - empty OVER() and OVER(PARTITION BY) #10286
Conversation
Codecov Report
@@ Coverage Diff @@
## master #10286 +/- ##
=============================================
+ Coverage 35.18% 70.34% +35.15%
- Complexity 245 5974 +5729
=============================================
Files 2027 2035 +8
Lines 109954 110309 +355
Branches 16711 16770 +59
=============================================
+ Hits 38688 77597 +38909
+ Misses 67914 27290 -40624
- Partials 3352 5422 +2070
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
...ry-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
Outdated
Show resolved
Hide resolved
ankitsultana
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excited to see this feature. Left some comments based on a cursory look.
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
ac4da49 to
e034704
Compare
ankitsultana
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a nit but lgtm. Thanks for addressing the feedback.
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...ry-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
Outdated
Show resolved
Hide resolved
...t-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
Outdated
Show resolved
Hide resolved
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
…ests for this scenario
0392aba to
2a37006
Compare
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Show resolved
Hide resolved
...ry-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
Outdated
Show resolved
Hide resolved
walterddr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm overall. tagging @xiangfu0 and @siddharthteotia for another look
vvivekiyer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
Will be reviewing this today |
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
Show resolved
Hide resolved
| * will output as many rows as input rows. | ||
| * | ||
| * Note: This class performs aggregation over the double value of input. | ||
| * If the input is single value, the output type will be input type. Otherwise, the output type will be double. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I follow this. What is the special casing here w.r.t single value v/s multi value ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is copied over from AggregateOperator (since both use the AggregationUtils) and basically discusses that if only a single input is part of the aggregation, then the Merger::initialize() function today doesn't cast it as a double, so the output won't be a double. For other cases it does use the double value:
default Object initialize(Object other, DataSchema.ColumnDataType dataType) {
// TODO: Initialize as a double so that if only one row is returned it matches the type when many rows are
// returned
return other == null ? dataType.getNullPlaceholder() : other;
}
Example merger function:
private static Object mergeMax(Object left, Object right) {
return Math.max(((Number) left).doubleValue(), ((Number) right).doubleValue());
}
I tried to fix this but ran into a bunch of test issues so decided to try and fix the initialize to also use doubleValue() in a separate PR if possible. Also wanted context on why the code was written in this way. Hope this clarifies the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@walterddr quick question in case you have context. Why do we initialize the aggregation output without using doubleValue() but use doubleValue() in the aggregation? Just wanted to understand the context and see if I can make a fix for this in a follow up PR
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Show resolved
Hide resolved
| List<RexExpression> orderSet, List<RelFieldCollation.Direction> orderSetDirection, | ||
| List<RelFieldCollation.NullDirection> orderSetNullDirection, List<RexExpression> aggCalls, int lowerBound, | ||
| int upperBound, boolean isRows, List<RexExpression> constants, DataSchema resultSchema, DataSchema inputSchema, | ||
| long requestId, int stageId, VirtualServerAddress virtualServerAddress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) is it possible to encapsulate the input into an POJO type object ? Might look cleaner / easy to read / review.
Not necessary for this PR though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah sure let me explore this as a future improvement
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
| _windowFrame = new WindowFrame(lowerBound, upperBound, isRows); | ||
|
|
||
| // TODO: add support for custom frames, and for ORDER BY default frame (upperBound => currentRow) | ||
| Preconditions.checkState(!_windowFrame.isRows(), "Only RANGE type frames are supported at present"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm not sure I understand this. I thought FRAME is not supported in the initial implementation based on the phases laid out in design doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These checks are for default frames (just to ensure we aren't getting inputs where the frames are overridden)
- Without
ORDER BY: default frame:UNBOUNDED PRECEDINGtoUNBOUNDED FOLLOWING - With
ORDER BY: default frame:UNBOUNDED PRECEDINGtoCURRENT ROW
Also whether ROWS or RANGE is the default depends on the default frame so that's why I check for that too.
| Preconditions.checkState(_windowFrame.isUnboundedPreceding(), | ||
| "Only default frame is supported, lowerBound must be UNBOUNDED PRECEDING"); | ||
| Preconditions.checkState(_windowFrame.isUnboundedFollowing() | ||
| || (_windowFrame.isUpperBoundCurrentRow() && isPartitionByOnly), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is 2nd check (after OR) needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a temporary check because today we don't support window functions with ORDER BY where the ORDER BY key is different from the PARTITION BY key. We do support queries where the keys are the same, e.g. OVER(PARTITION BY key1 ORDER BY key1).
For queries such as OVER(PARTITION BY key1 ORDER BY key1), the default frame lands up getting set to UNBOUNDED PRECEDING to CURRENT ROW. This check here ensures that if the upperBound is indeed CURRENT ROW we should check that it's a query where the PARTITION BY and ORDER BY key is the same.
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Show resolved
Hide resolved
| // The lower bound of the frame. Set to Integer.MIN_VALUE if UNBOUNDED PRECEDING | ||
| final int _lowerBound; | ||
| // The lower bound of the frame. Set to Integer.MAX_VALUE if UNBOUNDED FOLLOWING. Set to 0 if CURRENT ROW | ||
| final int _upperBound; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) multi stage engine's query planning is likely to be heap heavy so we may want to double check if int is really needed or can we do away with short.
not a big deal in this PR but something to do in follow-up if folks agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestion! I'll revisit these once I start work on frame support as I want to look into whether there is a recommendation on the frame limits or not. Will take this as an action item for frame support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's follow up later but IMO the most heavy part is not the memory size but the latency overhead :-P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree! we'll need to test this out for performance for sure
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Outdated
Show resolved
Hide resolved
| rows.add(row); | ||
| } | ||
| } | ||
| _hasReturnedWindowAggregateBlock = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state that whether or not we can produce output block should be maintained and dictated by the piece of code that consumed the input block - consumeInputBlocks
Generally speaking, whenever an operator consumes an input block, before exiting that function it should be able to determine whether or not it is ready to produce output block and accordingly change the local state which can then be leveraged in getNextBlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^ might just be more intuitive but nothing wrong with the current approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to keep the operator flow similar to AggregateOperator for now. This flag _hasReturnedWindowAggregateBlock is meant to differentiate the scenario where we finished consumption and haven't yet sent the output block, vs. the scenario where we finished consumption and sent the output block already but now just need to send a final EOS. In both cases the consumeInputBlocks will indicate there is nothing more to send.
We can revisit this logic for all operators at once if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. FYI @walterddr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now the logic is kind of there to guard the EOS as @somandal mentioned. we can revisit in general later. please kindly file an issue. (I am yet to understand what's the confusion and how can we improve it, but will look into it more generally once we file the issue with more context)
| List<Object[]> rowList = e.getValue(); | ||
| for (Object[] existingRow : rowList) { | ||
| Object[] row = new Object[existingRow.length + _aggCalls.size()]; | ||
| System.arraycopy(existingRow, 0, row, 0, existingRow.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really related to this PR but probably something we can improve generally
I am thinking that if we do the processing of operators in off heap buffers, then can we do something like this to avoid memcpy ?
Say we setup 3 ops in an op chain
Op1 -> Op2 -> Op3 from upstream to downstream
When the operators are setup, they get an input container (aka record batch with schema)
Op3's input container = Op2's output container
Op2's input container = Op1's output container
When the OpChain is setup, we call something like setup() on operator
Output setup (Input) {
// create Output container and return it
// the caller will use the returned Output as Input o setup the next downstream operator
}
We produce into Output but that does not require memcpy since it is direct buffers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestion. Let's look into this as a separate change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. FYI @walterddr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont understand why we need to copy in the first place. window over is 1-in-1-out in the row perspective. it only caches the row it gets, accumulate the results for the additional columns, and attach the result back to the row when it finishes.
i agree we will have overhead for heap memory b/c we need to buffer everything. but not sure why we need a mem copy, it felt like if we are using List<Object> instead of Object[] we can completely avoid this copy yes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to create an output in the format of List<Object[]>, where each Object[] contains the contents of a single row. The output Object[] row size is larger than the input row's size since we need to store the aggregated columns. Using List<Object> instead of Object[] can avoid the copy as it can be appended to and added to the output result.
siddharthteotia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Yet to finish going through some of the test changes. Reviewed everything else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm with some more comments. thank you @somandal for the contribution. this is a huge effort and we definitely appreciate the patience and consideration during the review/comment process.
| // Supported window functions | ||
| private static final Set<SqlKind> SUPPORTED_WINDOW_FUNCTION_KIND = ImmutableSet.of(SqlKind.SUM, SqlKind.SUM0, | ||
| SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT); | ||
| SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.OTHER_FUNCTION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the other function we've supported? can we add a comment for this? (i think it is BOOL_AND and BOOL_OR)
but IMO, BOOL_AND and BOOL_OR are just MIN(booleanCol) and MAX(booleanCol) :-P i didn't understand why we needed that in the first place lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point, I think we just saw that PostgreSQL has support for all aggregation functions and thought why not add what we have (at least the basic ones). Hope it's alright to leave this here. I've updated to add a comment here.
| // Frame literals come in the constants from the LogicalWindow and the bound.getOffset() stores the | ||
| // InputRef to the constants array offset by the input array length. These need to be extracted here and | ||
| // set to the bounds. | ||
| validateFrameBounds(windowGroup.lowerBound, windowGroup.upperBound, windowGroup.isRows); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented previously not sure isRows is a clear enough flag. let's name it isRowBased or use a enum
enum WindowBase {
ROW, RANGE
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done - I renamed it here, and store it as an enum in WindowAggregateOperator. If it's okay I'll create a util to store the enum in a common place for use in both places later as a followup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never mind, updated this to add the enum directly into WindowNode and access it in WindowAggregateOperator.
| /** | ||
| * Utility class to perform aggregations in the intermediate stage operators such as {@code AggregateOperator} and | ||
| * {@code WindowAggregateOperator} | ||
| */ | ||
| public class AggregationUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SWEET refactoring!!! Thank you!!!
suggest javadoc
| /** | |
| * Utility class to perform aggregations in the intermediate stage operators such as {@code AggregateOperator} and | |
| * {@code WindowAggregateOperator} | |
| */ | |
| public class AggregationUtils { | |
| /** | |
| * Utility class to perform accumulation over a collection of rows. It provides util to | |
| * (1) method to deal with aggregation key and (2) method to merge a row into an existing accumulator | |
| * | |
| * <p>Accumulation is used by {@code WindowAggregateOperator} and {@code AggregateOperator}. | |
| */ | |
| public class AggregationUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...y-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
Show resolved
Hide resolved
| // The lower bound of the frame. Set to Integer.MIN_VALUE if UNBOUNDED PRECEDING | ||
| final int _lowerBound; | ||
| // The lower bound of the frame. Set to Integer.MAX_VALUE if UNBOUNDED FOLLOWING. Set to 0 if CURRENT ROW | ||
| final int _upperBound; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's follow up later but IMO the most heavy part is not the memory size but the latency overhead :-P
| List<Object[]> rowList = e.getValue(); | ||
| for (Object[] existingRow : rowList) { | ||
| Object[] row = new Object[existingRow.length + _aggCalls.size()]; | ||
| System.arraycopy(existingRow, 0, row, 0, existingRow.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont understand why we need to copy in the first place. window over is 1-in-1-out in the row perspective. it only caches the row it gets, accumulate the results for the additional columns, and attach the result back to the row when it finishes.
i agree we will have overhead for heap memory b/c we need to buffer everything. but not sure why we need a mem copy, it felt like if we are using List<Object> instead of Object[] we can completely avoid this copy yes?
| rows.add(row); | ||
| } | ||
| } | ||
| _hasReturnedWindowAggregateBlock = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now the logic is kind of there to guard the EOS as @somandal mentioned. we can revisit in general later. please kindly file an issue. (I am yet to understand what's the confusion and how can we improve it, but will look into it more generally once we file the issue with more context)
Thanks @walterddr! Appreciate the thorough review and the guidance on this from all involved. |
This PR introduces the multi-stage runtime changes to support Phase 1 of Window Functions, specifically targeting empty
OVER()andOVER(PARTITION BY). Runtime support has been added for the following classes of window queries:OVER()- single and multiple emptyOVER()s in the same queryOVER(PARTITION BY)- single and multipleOVER(PARTITION BY)using the samePARTITION BYkey in the same queryThis PR also fixes some bugs in the window function planning, specifically:
OVER()andOVER(ORDER BY)as singleton stages which need a single node assignment (similar to global aggregation without group by and global sort)Add some logic forOVER(PARTITION BY key1 ORDER BY key1)queries which get marked asOVER(PARTITION BY key1)to ensure that theORDER BYkey1's direction and null direction are the defaults, otherwise treat these asOVER(PARTITION BY key1 ORDER BY key2)type queries during planning.The window functions supported as part of Phase 1 are: SUM, AVG, MIN, MAX, COUNT, BOOL_OR, and BOOL_AND.
PostgreSQL supports many aggregation functions inside window functions as mentioned in their documentation:
This PR also adds a new JSON file for window function query runtime tests via the output mechanism (H2 doesn't support window functions and got parse errors when trying to use it).
This PR does not include support for:
OVER(ORDER BY)andOVER (PARTITION BY ORDER BY)(will be the next PR for this feature)OVERclauses with differentPARTITION BY,ORDER BYand/orFRAMEspecifications, if these specifications are the same they get grouped into a single window group)The above will be part of future changes for window function support
cc @siddharthteotia @walterddr @vvivekiyer @Jackie-Jiang