Skip to content

Conversation

@haohuaijin
Copy link
Collaborator

@haohuaijin haohuaijin commented Oct 22, 2025

related to #8863

@github-actions github-actions bot added the ☢️ Bug Something isn't working label Oct 22, 2025
@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Swap Guard Logic

The new condition prevents swapping when the hash join mode is PartitionMode::CollectLeft. Verify that this guard covers all non-swappable cases (e.g., future modes akin to CollectLeft) and that it aligns with HashJoinExec::swap_inputs expectations to avoid incorrect plans for semi/anti joins.

// If left is not aggregate but right is aggregate, swap them
if !is_aggregate_exec(left)
    && is_aggregate_exec(right)
    && hash_join.join_type().supports_swap()
    && hash_join.mode != PartitionMode::CollectLeft
{
    return Ok(Transformed::yes(HashJoinExec::swap_inputs(
        hash_join,
        hash_join.mode,
Test Plan Stability

Tests assert exact get_plan_string outputs with detailed operator trees. These are brittle against minor upstream DataFusion changes. Consider asserting key fragments (e.g., join type/mode and critical operators) to reduce flakiness.

    let expected = vec![
        "ProjectionExec: expr=[count(Int64(1))@0 as count(*)]",
        "  AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]",
        "    CoalescePartitionsExec",
        "      AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]",
        "        ProjectionExec: expr=[]",
        "          CoalesceBatchesExec: target_batch_size=8192",
        "            HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(name@0, name@0)]",
        "              AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]",
        "                CoalesceBatchesExec: target_batch_size=8192",
        "                  RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "                    AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]",
        "                      CooperativeExec",
        "                        NewEmptyExec: name=\"t\", projection=[\"name\"], filters=[]",
        "              CoalesceBatchesExec: target_batch_size=8192",
        "                RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "                  CooperativeExec",
        "                    NewEmptyExec: name=\"t\", projection=[\"name\"], filters=[]",
    ];

    assert_eq!(expected, get_plan_string(&physical_plan));

    Ok(())
}

#[tokio::test]
async fn test_join_reorder_intersect() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("_timestamp", DataType::Int64, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let state = SessionStateBuilder::new()
        .with_config(SessionConfig::new().with_target_partitions(12))
        .with_runtime_env(Arc::new(RuntimeEnvBuilder::new().build().unwrap()))
        .with_default_features()
        .with_physical_optimizer_rule(Arc::new(JoinReorderRule::new()))
        .build();
    let ctx = SessionContext::new_with_state(state);
    let provider = NewEmptyTable::new("t", schema.clone()).with_partitions(12);
    ctx.register_table("t", Arc::new(provider)).unwrap();

    let sql = "SELECT name FROM t WHERE _timestamp > 1000 INTERSECT SELECT name FROM t WHERE _timestamp < 2000";
    let plan = ctx.state().create_logical_plan(sql).await?;
    let physical_plan = ctx.state().create_physical_plan(&plan).await?;

    let expected = vec![
        "CoalesceBatchesExec: target_batch_size=8192",
        "  HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(name@0, name@0)]",
        "    AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]",
        "      CoalesceBatchesExec: target_batch_size=8192",
        "        RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "          AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]",
        "            CoalesceBatchesExec: target_batch_size=8192",
        "              FilterExec: _timestamp@0 > 1000, projection=[name@1]",
        "                CooperativeExec",
        "                  NewEmptyExec: name=\"t\", projection=[\"_timestamp\", \"name\"], filters=[\"_timestamp > Int64(1000)\"]",
        "    CoalesceBatchesExec: target_batch_size=8192",
        "      RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "        CoalesceBatchesExec: target_batch_size=8192",
        "          FilterExec: _timestamp@0 < 2000, projection=[name@1]",
        "            CooperativeExec",
        "              NewEmptyExec: name=\"t\", projection=[\"_timestamp\", \"name\"], filters=[\"_timestamp < Int64(2000)\"]",
    ];

    assert_eq!(expected, get_plan_string(&physical_plan));

    Ok(())
}

#[tokio::test]
async fn test_join_reorder_except() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("_timestamp", DataType::Int64, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let state = SessionStateBuilder::new()
        .with_config(SessionConfig::new().with_target_partitions(12))
        .with_runtime_env(Arc::new(RuntimeEnvBuilder::new().build().unwrap()))
        .with_default_features()
        .with_physical_optimizer_rule(Arc::new(JoinReorderRule::new()))
        .build();
    let ctx = SessionContext::new_with_state(state);
    let provider = NewEmptyTable::new("t", schema.clone()).with_partitions(12);
    ctx.register_table("t", Arc::new(provider)).unwrap();

    let sql = "SELECT name FROM t WHERE _timestamp > 1000 EXCEPT SELECT name FROM t WHERE _timestamp < 2000";
    let plan = ctx.state().create_logical_plan(sql).await?;
    let physical_plan = ctx.state().create_physical_plan(&plan).await?;

    let expected = vec![
        "CoalesceBatchesExec: target_batch_size=8192",
        "  HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0, name@0)]",
        "    AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]",
        "      CoalesceBatchesExec: target_batch_size=8192",
        "        RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "          AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]",
        "            CoalesceBatchesExec: target_batch_size=8192",
        "              FilterExec: _timestamp@0 > 1000, projection=[name@1]",
        "                CooperativeExec",
        "                  NewEmptyExec: name=\"t\", projection=[\"_timestamp\", \"name\"], filters=[\"_timestamp > Int64(1000)\"]",
        "    CoalesceBatchesExec: target_batch_size=8192",
        "      RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "        CoalesceBatchesExec: target_batch_size=8192",
        "          FilterExec: _timestamp@0 < 2000, projection=[name@1]",
        "            CooperativeExec",
        "              NewEmptyExec: name=\"t\", projection=[\"_timestamp\", \"name\"], filters=[\"_timestamp < Int64(2000)\"]",
    ];

    assert_eq!(expected, get_plan_string(&physical_plan));
    Ok(())
}

#[tokio::test]
async fn test_join_reorder_dedup() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("_timestamp", DataType::Int64, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let state = SessionStateBuilder::new()
        .with_config(SessionConfig::new().with_target_partitions(12))
        .with_runtime_env(Arc::new(RuntimeEnvBuilder::new().build().unwrap()))
        .with_default_features()
        .with_optimizer_rule(Arc::new(LimitJoinRightSide::new(50_000)))
        .with_physical_optimizer_rule(Arc::new(JoinReorderRule::new()))
        .with_query_planner(Arc::new(OpenobserveQueryPlanner::new()))
        .build();
    let ctx = SessionContext::new_with_state(state);
    let provider = NewEmptyTable::new("t", schema.clone()).with_partitions(12);
    ctx.register_table("t", Arc::new(provider)).unwrap();

    let sql = "SELECT count(*) from t where name in (select distinct name from t)";
    let plan = ctx.state().create_logical_plan(sql).await?;
    let physical_plan = ctx.state().create_physical_plan(&plan).await?;

    let expected = vec![
        "ProjectionExec: expr=[count(Int64(1))@0 as count(*)]",
        "  AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]",
        "    CoalescePartitionsExec",
        "      AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]",
        "        ProjectionExec: expr=[]",
        "          CoalesceBatchesExec: target_batch_size=8192",
        "            HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(name@0, name@0)]",
        "              DeduplicationExec: columns: [name@0]",
        "                SortExec: TopK(fetch=50000), expr=[name@0 DESC NULLS LAST], preserve_partitioning=[false]",
        "                  CoalescePartitionsExec",
        "                    AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[], lim=[50000]",
        "                      CoalesceBatchesExec: target_batch_size=8192",
        "                        RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "                          AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[], lim=[50000]",
        "                            CooperativeExec",
        "                              NewEmptyExec: name=\"t\", projection=[\"name\"], filters=[]",
        "              CooperativeExec",
        "                NewEmptyExec: name=\"t\", projection=[\"name\"], filters=[]",
    ];

    assert_eq!(expected, get_plan_string(&physical_plan));

    Ok(())
}

#[tokio::test]
async fn test_join_reorder_intersect_dedup() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("_timestamp", DataType::Int64, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let state = SessionStateBuilder::new()
        .with_config(SessionConfig::new().with_target_partitions(12))
        .with_runtime_env(Arc::new(RuntimeEnvBuilder::new().build().unwrap()))
        .with_default_features()
        .with_optimizer_rule(Arc::new(LimitJoinRightSide::new(50_000)))
        .with_physical_optimizer_rule(Arc::new(JoinReorderRule::new()))
        .with_query_planner(Arc::new(OpenobserveQueryPlanner::new()))
        .build();
    let ctx = SessionContext::new_with_state(state);
    let provider = NewEmptyTable::new("t", schema.clone()).with_partitions(12);
    ctx.register_table("t", Arc::new(provider)).unwrap();

    let sql = "SELECT name FROM t WHERE _timestamp > 1000 INTERSECT SELECT name FROM t WHERE _timestamp < 2000";
    let plan = ctx.state().create_logical_plan(sql).await?;
    let physical_plan = ctx.state().create_physical_plan(&plan).await?;

    let expected = vec![
        "CoalesceBatchesExec: target_batch_size=8192",
        "  HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(name@0, name@0)]",
        "    ProjectionExec: expr=[name@1 as name]",
        "      DeduplicationExec: columns: [name@1]",
        "        SortExec: expr=[name@1 DESC NULLS LAST, _timestamp@0 DESC NULLS LAST], preserve_partitioning=[false]",
        "          SortPreservingMergeExec: [_timestamp@0 DESC NULLS LAST], fetch=50000",
        "            CoalesceBatchesExec: target_batch_size=8192, fetch=50000",
        "              FilterExec: _timestamp@0 < 2000",
        "                CooperativeExec",
        "                  NewEmptyExec: name=\"t\", projection=[\"_timestamp\", \"name\"], filters=[\"_timestamp < Int64(2000)\"], sorted_by_time=true",
        "    AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]",
        "      CoalesceBatchesExec: target_batch_size=8192",
        "        RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "          AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]",
        "            CoalesceBatchesExec: target_batch_size=8192",
        "              FilterExec: _timestamp@0 > 1000, projection=[name@1]",
        "                CooperativeExec",
        "                  NewEmptyExec: name=\"t\", projection=[\"_timestamp\", \"name\"], filters=[\"_timestamp > Int64(1000)\"]",
    ];

    assert_eq!(expected, get_plan_string(&physical_plan));

    Ok(())
}

#[tokio::test]
async fn test_join_reorder_except_dedup() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("_timestamp", DataType::Int64, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let state = SessionStateBuilder::new()
        .with_config(SessionConfig::new().with_target_partitions(12))
        .with_runtime_env(Arc::new(RuntimeEnvBuilder::new().build().unwrap()))
        .with_default_features()
        .with_optimizer_rule(Arc::new(LimitJoinRightSide::new(50_000)))
        .with_physical_optimizer_rule(Arc::new(JoinReorderRule::new()))
        .with_query_planner(Arc::new(OpenobserveQueryPlanner::new()))
        .build();
    let ctx = SessionContext::new_with_state(state);
    let provider = NewEmptyTable::new("t", schema.clone()).with_partitions(12);
    ctx.register_table("t", Arc::new(provider)).unwrap();

    let sql = "SELECT name FROM t WHERE _timestamp > 1000 EXCEPT SELECT name FROM t WHERE _timestamp < 2000";
    let plan = ctx.state().create_logical_plan(sql).await?;
    let physical_plan = ctx.state().create_physical_plan(&plan).await?;

    let expected = vec![
        "CoalesceBatchesExec: target_batch_size=8192",
        "  HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(name@0, name@0)]",
        "    ProjectionExec: expr=[name@1 as name]",
        "      DeduplicationExec: columns: [name@1]",
        "        SortExec: expr=[name@1 DESC NULLS LAST, _timestamp@0 DESC NULLS LAST], preserve_partitioning=[false]",
        "          SortPreservingMergeExec: [_timestamp@0 DESC NULLS LAST], fetch=50000",
        "            CoalesceBatchesExec: target_batch_size=8192, fetch=50000",
        "              FilterExec: _timestamp@0 < 2000",
        "                CooperativeExec",
        "                  NewEmptyExec: name=\"t\", projection=[\"_timestamp\", \"name\"], filters=[\"_timestamp < Int64(2000)\"], sorted_by_time=true",
        "    AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]",
        "      CoalesceBatchesExec: target_batch_size=8192",
        "        RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "          AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]",
        "            CoalesceBatchesExec: target_batch_size=8192",
        "              FilterExec: _timestamp@0 > 1000, projection=[name@1]",
        "                CooperativeExec",
        "                  NewEmptyExec: name=\"t\", projection=[\"_timestamp\", \"name\"], filters=[\"_timestamp > Int64(1000)\"]",
    ];

    assert_eq!(expected, get_plan_string(&physical_plan));
    Ok(())
}
Provider Semantics

Replacing MemTable with NewEmptyTable changes source behavior (empty inputs, partitioning, filters formatting). Ensure this still exercises the intended optimizer paths and does not mask regressions that would appear with non-empty data.

    let provider = NewEmptyTable::new("t", schema.clone()).with_partitions(12);
    ctx.register_table("t", Arc::new(provider)).unwrap();

    let sql = "SELECT count(*) from t where name in (select distinct name from t)";
    let plan = ctx.state().create_logical_plan(sql).await?;
    let physical_plan = ctx.state().create_physical_plan(&plan).await?;

    let expected = vec![
        "ProjectionExec: expr=[count(Int64(1))@0 as count(*)]",
        "  AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]",
        "    CoalescePartitionsExec",
        "      AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]",
        "        ProjectionExec: expr=[]",
        "          CoalesceBatchesExec: target_batch_size=8192",
        "            HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(name@0, name@0)]",
        "              AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]",
        "                CoalesceBatchesExec: target_batch_size=8192",
        "                  RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "                    AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]",
        "                      CooperativeExec",
        "                        NewEmptyExec: name=\"t\", projection=[\"name\"], filters=[]",
        "              CoalesceBatchesExec: target_batch_size=8192",
        "                RepartitionExec: partitioning=Hash([name@0], 12), input_partitions=12",
        "                  CooperativeExec",
        "                    NewEmptyExec: name=\"t\", projection=[\"name\"], filters=[]",
    ];

    assert_eq!(expected, get_plan_string(&physical_plan));

    Ok(())
}

#[tokio::test]
async fn test_join_reorder_intersect() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("_timestamp", DataType::Int64, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let state = SessionStateBuilder::new()
        .with_config(SessionConfig::new().with_target_partitions(12))
        .with_runtime_env(Arc::new(RuntimeEnvBuilder::new().build().unwrap()))
        .with_default_features()
        .with_physical_optimizer_rule(Arc::new(JoinReorderRule::new()))
        .build();
    let ctx = SessionContext::new_with_state(state);
    let provider = NewEmptyTable::new("t", schema.clone()).with_partitions(12);
    ctx.register_table("t", Arc::new(provider)).unwrap();

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent unsafe swaps for collect modes

Guard the swap by excluding all collect-side modes, not just CollectLeft. If other
collect modes (e.g., CollectRight) exist or get added, this condition could allow an
unsafe swap and produce incorrect plans. Compare against a non-partitioned/collect
class or explicitly disallow all collect variants.

src/service/search/datafusion/optimizer/physical_optimizer/join_reorder.rs [84-92]

 if !is_aggregate_exec(left)
     && is_aggregate_exec(right)
     && hash_join.join_type().supports_swap()
-    && hash_join.mode != PartitionMode::CollectLeft
+    && !matches!(hash_join.mode, PartitionMode::CollectLeft | PartitionMode::CollectRight)
 {
     return Ok(Transformed::yes(HashJoinExec::swap_inputs(
         hash_join,
         hash_join.mode,
Suggestion importance[1-10]: 6

__

Why: The added guard hash_join.mode != PartitionMode::CollectLeft is present at line 87; extending it to disallow other collect variants is a reasonable forward-safe improvement, though speculative since CollectRight may not exist. The change is minor but could prevent incorrect plans if additional collect modes are introduced.

Low
General
Improve test failure diagnostics

Use assert_eq! with a helpful diff on failure to ease diagnosing plan drift, as
exact string matching across versions can be brittle. Include the actual plan in the
failure message to quickly identify mismatches in CI.

src/service/search/datafusion/optimizer/physical_optimizer/join_reorder.rs [158-159]

-assert_eq!(expected, get_plan_string(&physical_plan));
+assert_eq!(expected, get_plan_string(&physical_plan), "Physical plan mismatch:\nexpected:\n{:#?}\nactual:\n{:#?}", expected, get_plan_string(&physical_plan));
Suggestion importance[1-10]: 5

__

Why: Enhancing assert_eq! to print expected and actual plans improves debuggability with minimal risk, but it doesn’t affect functionality. The existing code line appears in multiple tests; applying this uniformly would aid CI diagnosis.

Low

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Summary

Fixed incorrect join reordering for INTERSECT and EXCEPT SQL operations by preventing join input swapping when PartitionMode::CollectLeft is used.

  • Added condition hash_join.mode != PartitionMode::CollectLeft to the swap guard in swap_join_order function
  • INTERSECT/EXCEPT operations use LeftSemi/LeftAnti joins which are not commutative, so swapping their inputs would break query semantics
  • Added 4 new comprehensive test cases covering INTERSECT/EXCEPT queries both with and without deduplication optimization
  • Refactored test setup to use NewEmptyTable instead of MemTable for cleaner test code

Confidence Score: 5/5

  • This PR is safe to merge - it fixes a correctness bug with comprehensive test coverage
  • The fix adds a single, well-targeted guard condition that prevents incorrect join reordering for INTERSECT/EXCEPT queries. The change is minimal and surgical, only affecting the specific problematic case. Four new test cases thoroughly validate the fix for both INTERSECT and EXCEPT operations with and without deduplication.
  • No files require special attention

Important Files Changed

File Analysis

Filename Score Overview
src/service/search/datafusion/optimizer/physical_optimizer/join_reorder.rs 5/5 Adds guard condition to prevent incorrect join swapping for INTERSECT/EXCEPT queries with CollectLeft mode, includes comprehensive test coverage

Sequence Diagram

sequenceDiagram
    participant QP as Query Planner
    participant JRR as JoinReorderRule
    participant HJE as HashJoinExec
    participant AG as AggregateExec

    QP->>JRR: optimize(plan)
    JRR->>JRR: transform_down(swap_join_order)
    
    alt HashJoin found
        JRR->>HJE: check join_type.supports_swap()
        JRE->>HJE: check mode != CollectLeft
        
        alt Not aggregate on left AND aggregate on right AND swappable AND NOT CollectLeft
            HJE->>HJE: swap_inputs(hash_join, mode)
            HJE-->>JRR: Transformed::yes(swapped_join)
        else Cannot swap (CollectLeft or other reasons)
            HJE-->>JRR: Transformed::no(plan)
        end
    else Not a HashJoin
        JRR-->>QP: Transformed::no(plan)
    end
    
    JRR-->>QP: optimized plan
Loading

1 file reviewed, no comments

Edit Code Review Agent Settings | Greptile

@testdino-playwright-reporter
Copy link

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 365 346 0 19 0 95% 5m 14s

View Detailed Results

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: haohuaijin | Branch: fix-join-order | Commit: 6919dd9

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 365 344 0 19 2 94% 5m 14s

View Detailed Results

@hengfeiyang hengfeiyang merged commit f3a648d into main Oct 22, 2025
32 checks passed
@hengfeiyang hengfeiyang deleted the fix-join-order branch October 22, 2025 10:13
uddhavdave pushed a commit that referenced this pull request Oct 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

☢️ Bug Something isn't working Review effort 3/5

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants