Skip to content

Conversation

@yashmayya
Copy link
Contributor

  • Adds support for ASOF JOIN (and LEFT ASOF JOIN) in the multi-stage engine with syntax and semantics similar to Snowflake's - https://docs.snowflake.com/en/sql-reference/constructs/asof-join.
  • Calcite recently added support for this join type (in the parser, validator, and rel logic) in 1.38.0 - [CALCITE-6372] Support ASOF joins calcite#3883 (we recently upgraded from 1.37.0 to 1.39.0 - Upgrade Calcite to 1.39.0 #15263).
  • Note that Calcite currently doesn't support ASOF JOINs without an ON clause even though Snowflake does - https://docs.snowflake.com/en/sql-reference/constructs/asof-join#parameters. Currently, the workaround would be to use a clause like ON TRUE - [CALCITE-6372] Support ASOF joins calcite#3883 (comment). Note that Calcite enforces the ON clause in these joins to be a conjunction of equalities (apart from literal conditions).
  • The MATCH_CONDITION clause is mandatory (in both Calcite and Snowflake).
  • We don't use the MATCH_CONDITION to determine the exchange / distribution logic here in Pinot, we only use the actual join keys (from the ON clause) to implement the usual hash exchange. The reason is that MATCH_CONDITION can only be one out of >, >=, <, <= and the semantics of the join (finding the closest match) make it such that we can't use buckets to distribute the data accurately either. The hash exchange will be based on the join keys from the ON clause. If the clause is ON TRUE, we'll use a random + broadcast distribution strategy, similar to regular joins without equality based join conditions.
  • This patch also refactors the physical BaseJoinOperator and existing implementations to increase reused logic and adds a new AsofJoinOperator implementation that uses a tree map for efficient (binary search based) computation of the closest match as per the MATCH_CONDITION.
  • Since H2 doesn't support ASOF JOIN queries, the test cases added here only have manually validated logical correctness.

@yashmayya yashmayya added feature release-notes Referenced by PRs that need attention when compiling the next release notes multi-stage Related to the multi-stage query engine labels Apr 24, 2025
@codecov-commenter
Copy link

codecov-commenter commented Apr 24, 2025

Codecov Report

❌ Patch coverage is 79.01235% with 34 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.38%. Comparing base (1a476de) to head (aab2f94).
⚠️ Report is 859 commits behind head on master.

Files with missing lines Patch % Lines
.../query/planner/logical/RelToPlanNodeConverter.java 80.64% 0 Missing and 6 partials ⚠️
...pinot/query/runtime/operator/AsofJoinOperator.java 89.65% 5 Missing and 1 partial ⚠️
.../query/planner/logical/PlanNodeToRelConverter.java 0.00% 5 Missing ⚠️
.../pinot/query/planner/serde/PlanNodeSerializer.java 37.50% 4 Missing and 1 partial ⚠️
...inot/query/planner/serde/PlanNodeDeserializer.java 33.33% 3 Missing and 1 partial ⚠️
...he/pinot/query/runtime/plan/PlanNodeToOpChain.java 57.14% 2 Missing and 1 partial ⚠️
.../apache/pinot/query/planner/plannode/JoinNode.java 75.00% 0 Missing and 2 partials ⚠️
...ot/query/runtime/operator/NonEquiJoinOperator.java 50.00% 1 Missing and 1 partial ⚠️
...pinot/query/runtime/operator/BaseJoinOperator.java 96.29% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15630      +/-   ##
============================================
+ Coverage     62.90%   63.38%   +0.48%     
+ Complexity     1386     1354      -32     
============================================
  Files          2867     2896      +29     
  Lines        163354   166074    +2720     
  Branches      24952    25403     +451     
============================================
+ Hits         102755   105267    +2512     
+ Misses        52847    52837      -10     
- Partials       7752     7970     +218     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.33% <79.01%> (+0.46%) ⬆️
java-21 63.33% <79.01%> (+0.51%) ⬆️
skip-bytebuffers-false ?
skip-bytebuffers-true ?
temurin 63.38% <79.01%> (+0.48%) ⬆️
unittests 63.38% <79.01%> (+0.48%) ⬆️
unittests1 56.44% <79.01%> (+0.61%) ⬆️
unittests2 33.38% <0.00%> (-0.19%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@yashmayya yashmayya marked this pull request as ready for review April 28, 2025 13:32
Preconditions.checkState(
matchKeys.size() == 2 && matchKeys.get(0) instanceof RexExpression.InputRef
&& matchKeys.get(1) instanceof RexExpression.InputRef,
"ASOF_JOIN operator only supports match conditions with a comparison between two columns of the same type");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this checking that both columns are of the same type? I'm only seeing it checking they are instances of type InputRef, but maybe I'm missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO above mentions the reason - basically, if the two columns are of different but coercible types, Calcite will introduce a cast function call on one side of the match condition. So one of the keys will no longer be an InputRef, but a cast FunctionCall (with the InputRef being its first operand).

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for ASOF JOIN (and LEFT ASOF JOIN) in the multi‐stage engine following semantics similar to Snowflake’s implementation. Key changes include:

  • Introducing a new AsofJoinOperator that uses a tree map for binary search based closest match computation.
  • Refactoring existing join operator base classes to factor out common logic via new abstract methods.
  • Updating the planning, serialization, and deserialization logic to support the new join type and syntax.

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
PlanNodeToOpChain.java Updated join strategy handling to add a new case for ASOF JOIN.
NonEquiJoinOperator.java & HashJoinOperator.java Removed inline right table building logic and refactored to delegate to new methods in BaseJoinOperator.
BaseJoinOperator.java Refactored right table building and introduced abstract methods for adding and finishing table construction.
AsofJoinOperator.java Added new operator implementing ASOF JOIN semantics using a tree map for closest match.
PlanNodeSerializer.java, PlanNodeDeserializer.java, JoinNode.java Updated to include match condition and join strategy changes.
RelToPlanNodeConverter.java & PlanNodeToRelConverter.java Added conversion support for logical ASOF JOIN nodes.
PinotJoinExchangeNodeInsertRule.java, plan.proto Updated to support the new join types at the distribution and proto levels.
Comments suppressed due to low confidence (1)

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java:310

  • The conversion mapping renames the join strategy from ASOF in code to AS_OF in the proto. For clearer consistency across modules, consider standardizing the naming convention for the ASOF join strategy.
case AS_OF:

&& matchKeys.get(1) instanceof RexExpression.InputRef,
"ASOF_JOIN operator only supports match conditions with a comparison between two columns of the same type");
_leftMatchKeyIndex = ((RexExpression.InputRef) matchKeys.get(0)).getIndex();
_rightMatchKeyIndex = ((RexExpression.InputRef) matchKeys.get(1)).getIndex() - leftSchema.size();
Copy link

Copilot AI May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify in the comments that subtracting leftSchema.size() is based on the assumption that the right input's column indices start immediately after the left schema. This will help future maintainers understand the rationale behind the index adjustment.

Copilot uses AI. Check for mistakes.
Comment on lines 71 to 74
// TODO: Add support for MATCH_CONDITION containing two columns of different types. In that case, there would be
// a CAST RexExpression.FunctionCall on top of the RexExpression.InputRef, and we'd need to add the
// appropriate type casts to make sure that the Comparable based comparisons in this class don't throw
// ClassCastException.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we able to fail during planning in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've moved all the checks from this operator into the planning phase (I hadn't realized that some of the checks were duplicated across both the places).

@yashmayya yashmayya merged commit 767cb9b into apache:master May 28, 2025
18 checks passed
songwdfu pushed a commit to songwdfu/pinot that referenced this pull request Jun 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature multi-stage Related to the multi-stage query engine release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants