-
Notifications
You must be signed in to change notification settings - Fork 1.4k
ASOF JOIN #15630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ASOF JOIN #15630
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #15630 +/- ##
============================================
+ Coverage 62.90% 63.38% +0.48%
+ Complexity 1386 1354 -32
============================================
Files 2867 2896 +29
Lines 163354 166074 +2720
Branches 24952 25403 +451
============================================
+ Hits 102755 105267 +2512
+ Misses 52847 52837 -10
- Partials 7752 7970 +218
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| Preconditions.checkState( | ||
| matchKeys.size() == 2 && matchKeys.get(0) instanceof RexExpression.InputRef | ||
| && matchKeys.get(1) instanceof RexExpression.InputRef, | ||
| "ASOF_JOIN operator only supports match conditions with a comparison between two columns of the same type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this checking that both columns are of the same type? I'm only seeing it checking they are instances of type InputRef, but maybe I'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TODO above mentions the reason - basically, if the two columns are of different but coercible types, Calcite will introduce a cast function call on one side of the match condition. So one of the keys will no longer be an InputRef, but a cast FunctionCall (with the InputRef being its first operand).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for ASOF JOIN (and LEFT ASOF JOIN) in the multi‐stage engine following semantics similar to Snowflake’s implementation. Key changes include:
- Introducing a new AsofJoinOperator that uses a tree map for binary search based closest match computation.
- Refactoring existing join operator base classes to factor out common logic via new abstract methods.
- Updating the planning, serialization, and deserialization logic to support the new join type and syntax.
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| PlanNodeToOpChain.java | Updated join strategy handling to add a new case for ASOF JOIN. |
| NonEquiJoinOperator.java & HashJoinOperator.java | Removed inline right table building logic and refactored to delegate to new methods in BaseJoinOperator. |
| BaseJoinOperator.java | Refactored right table building and introduced abstract methods for adding and finishing table construction. |
| AsofJoinOperator.java | Added new operator implementing ASOF JOIN semantics using a tree map for closest match. |
| PlanNodeSerializer.java, PlanNodeDeserializer.java, JoinNode.java | Updated to include match condition and join strategy changes. |
| RelToPlanNodeConverter.java & PlanNodeToRelConverter.java | Added conversion support for logical ASOF JOIN nodes. |
| PinotJoinExchangeNodeInsertRule.java, plan.proto | Updated to support the new join types at the distribution and proto levels. |
Comments suppressed due to low confidence (1)
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java:310
- The conversion mapping renames the join strategy from ASOF in code to AS_OF in the proto. For clearer consistency across modules, consider standardizing the naming convention for the ASOF join strategy.
case AS_OF:
| && matchKeys.get(1) instanceof RexExpression.InputRef, | ||
| "ASOF_JOIN operator only supports match conditions with a comparison between two columns of the same type"); | ||
| _leftMatchKeyIndex = ((RexExpression.InputRef) matchKeys.get(0)).getIndex(); | ||
| _rightMatchKeyIndex = ((RexExpression.InputRef) matchKeys.get(1)).getIndex() - leftSchema.size(); |
Copilot
AI
May 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify in the comments that subtracting leftSchema.size() is based on the assumption that the right input's column indices start immediately after the left schema. This will help future maintainers understand the rationale behind the index adjustment.
| // TODO: Add support for MATCH_CONDITION containing two columns of different types. In that case, there would be | ||
| // a CAST RexExpression.FunctionCall on top of the RexExpression.InputRef, and we'd need to add the | ||
| // appropriate type casts to make sure that the Comparable based comparisons in this class don't throw | ||
| // ClassCastException. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't we able to fail during planning in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I've moved all the checks from this operator into the planning phase (I hadn't realized that some of the checks were duplicated across both the places).
ASOF JOIN(andLEFT ASOF JOIN) in the multi-stage engine with syntax and semantics similar to Snowflake's - https://docs.snowflake.com/en/sql-reference/constructs/asof-join.1.38.0- [CALCITE-6372] Support ASOF joins calcite#3883 (we recently upgraded from1.37.0to1.39.0- Upgrade Calcite to 1.39.0 #15263).ASOF JOINs without anONclause even though Snowflake does - https://docs.snowflake.com/en/sql-reference/constructs/asof-join#parameters. Currently, the workaround would be to use a clause likeON TRUE- [CALCITE-6372] Support ASOF joins calcite#3883 (comment). Note that Calcite enforces theONclause in these joins to be a conjunction of equalities (apart from literal conditions).MATCH_CONDITIONclause is mandatory (in both Calcite and Snowflake).MATCH_CONDITIONto determine the exchange / distribution logic here in Pinot, we only use the actual join keys (from theONclause) to implement the usual hash exchange. The reason is thatMATCH_CONDITIONcan only be one out of>,>=,<,<=and the semantics of the join (finding the closest match) make it such that we can't use buckets to distribute the data accurately either. The hash exchange will be based on the join keys from theONclause. If the clause isON TRUE, we'll use a random + broadcast distribution strategy, similar to regular joins without equality based join conditions.BaseJoinOperatorand existing implementations to increase reused logic and adds a newAsofJoinOperatorimplementation that uses a tree map for efficient (binary search based) computation of the closest match as per theMATCH_CONDITION.ASOF JOINqueries, the test cases added here only have manually validated logical correctness.