Skip to content

Conversation

@somandal
Copy link
Contributor

@somandal somandal commented Feb 15, 2023

This PR introduces the multi-stage runtime changes to support Phase 1 of Window Functions, specifically targeting empty OVER() and OVER(PARTITION BY). Runtime support has been added for the following classes of window queries:

  • Empty OVER() - single and multiple empty OVER()s in the same query
  • OVER(PARTITION BY) - single and multiple OVER(PARTITION BY) using the same PARTITION BY key in the same query

This PR also fixes some bugs in the window function planning, specifically:

  • Create Empty OVER() and OVER(ORDER BY) as singleton stages which need a single node assignment (similar to global aggregation without group by and global sort)
  • Add some logic for OVER(PARTITION BY key1 ORDER BY key1) queries which get marked as OVER(PARTITION BY key1) to ensure that the ORDER BY key1's direction and null direction are the defaults, otherwise treat these as OVER(PARTITION BY key1 ORDER BY key2) type queries during planning.

The window functions supported as part of Phase 1 are: SUM, AVG, MIN, MAX, COUNT, BOOL_OR, and BOOL_AND.
PostgreSQL supports many aggregation functions inside window functions as mentioned in their documentation:

In addition to these functions, any built-in or user-defined ordinary aggregate (i.e., not ordered-set or hypothetical-set aggregates) can be used as a window function; see Section 9.21 for a list of the built-in aggregates. Aggregate functions act as window functions only when an OVER clause follows the call; otherwise they act as plain aggregates and return a single row for the entire set.

This PR also adds a new JSON file for window function query runtime tests via the output mechanism (H2 doesn't support window functions and got parse errors when trying to use it).

This PR does not include support for:

  • Execution engine changes for Phase 1 for OVER(ORDER BY) and OVER (PARTITION BY ORDER BY) (will be the next PR for this feature)
  • Custom frames
  • Other window functions related to rank and values
  • Multiple window groups (basically multiple OVER clauses with different PARTITION BY, ORDER BY and/or FRAME specifications, if these specifications are the same they get grouped into a single window group)

The above will be part of future changes for window function support

cc @siddharthteotia @walterddr @vvivekiyer @Jackie-Jiang

@codecov-commenter
Copy link

codecov-commenter commented Feb 15, 2023

Codecov Report

Merging #10286 (1c6af7d) into master (1f726dc) will increase coverage by 35.15%.
The diff coverage is 92.55%.

@@              Coverage Diff              @@
##             master   #10286       +/-   ##
=============================================
+ Coverage     35.18%   70.34%   +35.15%     
- Complexity      245     5974     +5729     
=============================================
  Files          2027     2035        +8     
  Lines        109954   110309      +355     
  Branches      16711    16770       +59     
=============================================
+ Hits          38688    77597    +38909     
+ Misses        67914    27290    -40624     
- Partials       3352     5422     +2070     
Flag Coverage Δ
integration1 24.37% <0.00%> (-0.11%) ⬇️
integration2 24.40% <0.00%> (-0.05%) ⬇️
unittests1 67.85% <92.55%> (?)
unittests2 13.74% <0.00%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../org/apache/pinot/query/planner/StageMetadata.java 93.93% <66.66%> (+93.93%) ⬆️
...g/apache/pinot/query/planner/stage/WindowNode.java 80.00% <66.66%> (+80.00%) ⬆️
...uery/runtime/operator/WindowAggregateOperator.java 91.66% <91.66%> (ø)
...query/runtime/operator/utils/AggregationUtils.java 96.49% <96.49%> (ø)
...inot/query/runtime/operator/AggregateOperator.java 92.30% <100.00%> (+92.30%) ⬆️
.../pinot/query/runtime/plan/PhysicalPlanVisitor.java 97.67% <100.00%> (+97.67%) ⬆️
...data/manager/realtime/DefaultSegmentCommitter.java 50.00% <0.00%> (-30.00%) ⬇️
...er/api/resources/LLCSegmentCompletionHandlers.java 43.56% <0.00%> (-18.82%) ⬇️
...altime/ServerSegmentCompletionProtocolHandler.java 51.88% <0.00%> (-6.61%) ⬇️
...not/broker/broker/helix/ClusterChangeMediator.java 75.26% <0.00%> (-5.38%) ⬇️
... and 1184 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@siddharthteotia siddharthteotia changed the title [multistage] Initial (phase 1) Add query runtime support for empty OVER() and OVER(PARTITION BY) window functions [multistage] Initial (phase 1) Query runtime for window functions - empty OVER() and OVER(PARTITION BY) Feb 15, 2023
Copy link
Contributor

@ankitsultana ankitsultana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excited to see this feature. Left some comments based on a cursory look.

@somandal somandal force-pushed the window-functions-execution-phase1 branch from ac4da49 to e034704 Compare February 22, 2023 05:44
Copy link
Contributor

@ankitsultana ankitsultana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a nit but lgtm. Thanks for addressing the feedback.

@somandal somandal requested a review from walterddr February 24, 2023 21:54
@somandal somandal force-pushed the window-functions-execution-phase1 branch from 0392aba to 2a37006 Compare February 24, 2023 23:02
Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm overall. tagging @xiangfu0 and @siddharthteotia for another look

Copy link
Contributor

@vvivekiyer vvivekiyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@siddharthteotia
Copy link
Contributor

Will be reviewing this today

* will output as many rows as input rows.
*
* Note: This class performs aggregation over the double value of input.
* If the input is single value, the output type will be input type. Otherwise, the output type will be double.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow this. What is the special casing here w.r.t single value v/s multi value ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is copied over from AggregateOperator (since both use the AggregationUtils) and basically discusses that if only a single input is part of the aggregation, then the Merger::initialize() function today doesn't cast it as a double, so the output won't be a double. For other cases it does use the double value:

    default Object initialize(Object other, DataSchema.ColumnDataType dataType) {
      // TODO: Initialize as a double so that if only one row is returned it matches the type when many rows are
      //       returned
      return other == null ? dataType.getNullPlaceholder() : other;
    }

Example merger function:

  private static Object mergeMax(Object left, Object right) {
    return Math.max(((Number) left).doubleValue(), ((Number) right).doubleValue());
  }

I tried to fix this but ran into a bunch of test issues so decided to try and fix the initialize to also use doubleValue() in a separate PR if possible. Also wanted context on why the code was written in this way. Hope this clarifies the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@walterddr quick question in case you have context. Why do we initialize the aggregation output without using doubleValue() but use doubleValue() in the aggregation? Just wanted to understand the context and see if I can make a fix for this in a follow up PR

List<RexExpression> orderSet, List<RelFieldCollation.Direction> orderSetDirection,
List<RelFieldCollation.NullDirection> orderSetNullDirection, List<RexExpression> aggCalls, int lowerBound,
int upperBound, boolean isRows, List<RexExpression> constants, DataSchema resultSchema, DataSchema inputSchema,
long requestId, int stageId, VirtualServerAddress virtualServerAddress) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) is it possible to encapsulate the input into an POJO type object ? Might look cleaner / easy to read / review.

Not necessary for this PR though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah sure let me explore this as a future improvement

_windowFrame = new WindowFrame(lowerBound, upperBound, isRows);

// TODO: add support for custom frames, and for ORDER BY default frame (upperBound => currentRow)
Preconditions.checkState(!_windowFrame.isRows(), "Only RANGE type frames are supported at present");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm not sure I understand this. I thought FRAME is not supported in the initial implementation based on the phases laid out in design doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks are for default frames (just to ensure we aren't getting inputs where the frames are overridden)

  • Without ORDER BY: default frame: UNBOUNDED PRECEDING to UNBOUNDED FOLLOWING
  • With ORDER BY: default frame: UNBOUNDED PRECEDING to CURRENT ROW

Also whether ROWS or RANGE is the default depends on the default frame so that's why I check for that too.

Preconditions.checkState(_windowFrame.isUnboundedPreceding(),
"Only default frame is supported, lowerBound must be UNBOUNDED PRECEDING");
Preconditions.checkState(_windowFrame.isUnboundedFollowing()
|| (_windowFrame.isUpperBoundCurrentRow() && isPartitionByOnly),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is 2nd check (after OR) needed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a temporary check because today we don't support window functions with ORDER BY where the ORDER BY key is different from the PARTITION BY key. We do support queries where the keys are the same, e.g. OVER(PARTITION BY key1 ORDER BY key1).

For queries such as OVER(PARTITION BY key1 ORDER BY key1), the default frame lands up getting set to UNBOUNDED PRECEDING to CURRENT ROW. This check here ensures that if the upperBound is indeed CURRENT ROW we should check that it's a query where the PARTITION BY and ORDER BY key is the same.

// The lower bound of the frame. Set to Integer.MIN_VALUE if UNBOUNDED PRECEDING
final int _lowerBound;
// The lower bound of the frame. Set to Integer.MAX_VALUE if UNBOUNDED FOLLOWING. Set to 0 if CURRENT ROW
final int _upperBound;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) multi stage engine's query planning is likely to be heap heavy so we may want to double check if int is really needed or can we do away with short.

not a big deal in this PR but something to do in follow-up if folks agree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion! I'll revisit these once I start work on frame support as I want to look into whether there is a recommendation on the frame limits or not. Will take this as an action item for frame support.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's follow up later but IMO the most heavy part is not the memory size but the latency overhead :-P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree! we'll need to test this out for performance for sure

rows.add(row);
}
}
_hasReturnedWindowAggregateBlock = true;
Copy link
Contributor

@siddharthteotia siddharthteotia Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state that whether or not we can produce output block should be maintained and dictated by the piece of code that consumed the input block - consumeInputBlocks

Generally speaking, whenever an operator consumes an input block, before exiting that function it should be able to determine whether or not it is ready to produce output block and accordingly change the local state which can then be leveraged in getNextBlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ might just be more intuitive but nothing wrong with the current approach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to keep the operator flow similar to AggregateOperator for now. This flag _hasReturnedWindowAggregateBlock is meant to differentiate the scenario where we finished consumption and haven't yet sent the output block, vs. the scenario where we finished consumption and sent the output block already but now just need to send a final EOS. In both cases the consumeInputBlocks will indicate there is nothing more to send.

We can revisit this logic for all operators at once if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. FYI @walterddr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now the logic is kind of there to guard the EOS as @somandal mentioned. we can revisit in general later. please kindly file an issue. (I am yet to understand what's the confusion and how can we improve it, but will look into it more generally once we file the issue with more context)

List<Object[]> rowList = e.getValue();
for (Object[] existingRow : rowList) {
Object[] row = new Object[existingRow.length + _aggCalls.size()];
System.arraycopy(existingRow, 0, row, 0, existingRow.length);
Copy link
Contributor

@siddharthteotia siddharthteotia Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really related to this PR but probably something we can improve generally

I am thinking that if we do the processing of operators in off heap buffers, then can we do something like this to avoid memcpy ?

Say we setup 3 ops in an op chain

Op1 -> Op2 -> Op3 from upstream to downstream

When the operators are setup, they get an input container (aka record batch with schema)

Op3's input container = Op2's output container
Op2's input container = Op1's output container

When the OpChain is setup, we call something like setup() on operator

Output setup (Input) {
   // create Output container and return it
   // the caller will use the returned Output as Input o setup the next downstream operator
}

We produce into Output but that does not require memcpy since it is direct buffers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion. Let's look into this as a separate change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. FYI @walterddr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont understand why we need to copy in the first place. window over is 1-in-1-out in the row perspective. it only caches the row it gets, accumulate the results for the additional columns, and attach the result back to the row when it finishes.

i agree we will have overhead for heap memory b/c we need to buffer everything. but not sure why we need a mem copy, it felt like if we are using List<Object> instead of Object[] we can completely avoid this copy yes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to create an output in the format of List<Object[]>, where each Object[] contains the contents of a single row. The output Object[] row size is larger than the input row's size since we need to store the aggregated columns. Using List<Object> instead of Object[] can avoid the copy as it can be appended to and added to the output result.

@somandal somandal requested a review from siddharthteotia March 2, 2023 15:59
Copy link
Contributor

@siddharthteotia siddharthteotia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Yet to finish going through some of the test changes. Reviewed everything else.

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm with some more comments. thank you @somandal for the contribution. this is a huge effort and we definitely appreciate the patience and consideration during the review/comment process.

// Supported window functions
private static final Set<SqlKind> SUPPORTED_WINDOW_FUNCTION_KIND = ImmutableSet.of(SqlKind.SUM, SqlKind.SUM0,
SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT);
SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.OTHER_FUNCTION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the other function we've supported? can we add a comment for this? (i think it is BOOL_AND and BOOL_OR)

but IMO, BOOL_AND and BOOL_OR are just MIN(booleanCol) and MAX(booleanCol) :-P i didn't understand why we needed that in the first place lol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point, I think we just saw that PostgreSQL has support for all aggregation functions and thought why not add what we have (at least the basic ones). Hope it's alright to leave this here. I've updated to add a comment here.

// Frame literals come in the constants from the LogicalWindow and the bound.getOffset() stores the
// InputRef to the constants array offset by the input array length. These need to be extracted here and
// set to the bounds.
validateFrameBounds(windowGroup.lowerBound, windowGroup.upperBound, windowGroup.isRows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented previously not sure isRows is a clear enough flag. let's name it isRowBased or use a enum

enum WindowBase {
  ROW, RANGE
}

Copy link
Contributor Author

@somandal somandal Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done - I renamed it here, and store it as an enum in WindowAggregateOperator. If it's okay I'll create a util to store the enum in a common place for use in both places later as a followup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, updated this to add the enum directly into WindowNode and access it in WindowAggregateOperator.

Comment on lines 33 to 37
/**
* Utility class to perform aggregations in the intermediate stage operators such as {@code AggregateOperator} and
* {@code WindowAggregateOperator}
*/
public class AggregationUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SWEET refactoring!!! Thank you!!!

suggest javadoc

Suggested change
/**
* Utility class to perform aggregations in the intermediate stage operators such as {@code AggregateOperator} and
* {@code WindowAggregateOperator}
*/
public class AggregationUtils {
/**
* Utility class to perform accumulation over a collection of rows. It provides util to
* (1) method to deal with aggregation key and (2) method to merge a row into an existing accumulator
*
* <p>Accumulation is used by {@code WindowAggregateOperator} and {@code AggregateOperator}.
*/
public class AggregationUtils {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// The lower bound of the frame. Set to Integer.MIN_VALUE if UNBOUNDED PRECEDING
final int _lowerBound;
// The lower bound of the frame. Set to Integer.MAX_VALUE if UNBOUNDED FOLLOWING. Set to 0 if CURRENT ROW
final int _upperBound;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's follow up later but IMO the most heavy part is not the memory size but the latency overhead :-P

List<Object[]> rowList = e.getValue();
for (Object[] existingRow : rowList) {
Object[] row = new Object[existingRow.length + _aggCalls.size()];
System.arraycopy(existingRow, 0, row, 0, existingRow.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont understand why we need to copy in the first place. window over is 1-in-1-out in the row perspective. it only caches the row it gets, accumulate the results for the additional columns, and attach the result back to the row when it finishes.

i agree we will have overhead for heap memory b/c we need to buffer everything. but not sure why we need a mem copy, it felt like if we are using List<Object> instead of Object[] we can completely avoid this copy yes?

rows.add(row);
}
}
_hasReturnedWindowAggregateBlock = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now the logic is kind of there to guard the EOS as @somandal mentioned. we can revisit in general later. please kindly file an issue. (I am yet to understand what's the confusion and how can we improve it, but will look into it more generally once we file the issue with more context)

@somandal somandal requested a review from walterddr March 3, 2023 04:43
@somandal
Copy link
Contributor Author

somandal commented Mar 3, 2023

lgtm with some more comments. thank you @somandal for the contribution. this is a huge effort and we definitely appreciate the patience and consideration during the review/comment process.

Thanks @walterddr! Appreciate the thorough review and the guidance on this from all involved.
I've addressed all comments. PTAL when you get a chance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature multi-stage Related to the multi-stage query engine window-functions Related to SQL window functions on the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants