Skip to content

Conversation

@haohuaijin
Copy link
Collaborator

@haohuaijin haohuaijin commented Sep 8, 2025

No description provided.

@github-actions github-actions bot added the ☢️ Bug Something isn't working label Sep 8, 2025
@haohuaijin haohuaijin changed the title fix: some join behavior change after upgrade datafusion fix: hash join behavior change after upgrade datafusion Sep 8, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2025

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Possible Panic

Multiple places call visitor.table_name.clone().unwrap() after only checking !visitor.has_remote_scan. If a plan subtree lacks both a RemoteScan and a derivable table name (e.g., projections, filters over non-table sources), this can unwrap None and panic. Validate table_name presence before unwrapping or propagate an error.

fn f_up(&mut self, node: Arc<dyn ExecutionPlan>) -> Result<Transformed<Self::Node>> {
    if node.name() == "RepartitionExec" || node.name() == "CoalescePartitionsExec" {
        let mut visitor = TableNameVisitor::new();
        node.visit(&mut visitor)?;
        if !visitor.has_remote_scan {
            let table_name = visitor.table_name.clone().unwrap();
            let input = node.children()[0];
            let remote_scan = Arc::new(RemoteScanExec::new(
                input.clone(),
Partitioning Choice

When wrapping HashJoinExec children, repartitioning uses RoundRobinBatch with the child’s existing partition count. This may reduce or misalign partitioning and affect join performance. Consider preserving/hash partitioning on join keys or leveraging session target partitions instead of fixed round-robin.

let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in node.children() {
    let mut visitor = TableNameVisitor::new();
    child.visit(&mut visitor)?;
    if !visitor.has_remote_scan {
        let table_name = visitor.table_name.clone().unwrap();
        let remote_scan = Arc::new(RemoteScanExec::new(
            child.clone(),
            self.remote_scan_nodes.get_remote_node(&table_name),
        )?);
        // add repartition for better performance(imporve parallelism)
        let output_partitioning = Partitioning::RoundRobinBatch(
            child.output_partitioning().partition_count(),
        );
        let repartition =
            Arc::new(RepartitionExec::try_new(remote_scan, output_partitioning)?);
        new_children.push(repartition);
    } else {
        new_children.push(child.clone());
    }
}
let new_node = node.with_new_children(new_children)?;
self.is_changed = true;
return Ok(Transformed::yes(new_node));
Visitor Short-Circuiting

TableNameVisitor stops traversal on the first RemoteScanExec. For complex subtrees (e.g., Union with mixed branches), this might miss table names or remote scans in other branches depending on visit order. Confirm traversal visits all children appropriately before deciding has_remote_scan.

impl<'n> TreeNodeVisitor<'n> for TableNameVisitor {
    type Node = Arc<dyn ExecutionPlan>;

    fn f_up(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
        let name = node.name();
        if name == "RemoteScanExec" {
            self.has_remote_scan = true;
            Ok(TreeNodeRecursion::Stop)
        } else if name == "NewEmptyExec" {

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR fixes a critical logical bug in the remote scan optimizer that emerged after upgrading DataFusion. The primary change involves renaming and inverting the semantic meaning of a boolean flag from is_remote_scan to has_remote_scan. The original flag used true to indicate "should add remote scan" and false for "already has remote scan". The new flag uses false to mean "does not have remote scan" and true for "already has remote scan", making the logic more intuitive.

All condition checks throughout the optimizer were updated accordingly - changing from if visitor.is_remote_scan to if !visitor.has_remote_scan to maintain the same logical behavior. This ensures that remote scan nodes are still properly added to execution plans that don't already have them.

Additionally, the PR adds explicit handling for HashJoinExec nodes, which appears to be necessary due to changes in how DataFusion represents join operations after the upgrade. The new logic iterates through each child of a join operation and wraps those without existing remote scans in RemoteScanExec nodes, preserving distributed query execution capabilities.

A minor cleanup removes the #[allow(dead_code)] attribute from the get_count() method, suggesting this method is now being used elsewhere in the codebase.

Confidence score: 4/5

  • This PR addresses critical functionality for distributed query execution with mostly straightforward logic changes
  • Score reflects confidence in the semantic inversion logic and new join handling, though DataFusion upgrade impacts could have subtle effects
  • Pay close attention to the HashJoinExec handling logic and verify all condition inversions are correct

1 file reviewed, 1 comment

Edit Code Review Bot Settings | Greptile

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Guard against missing table name

Avoid unwrapping table_name without checking it is set; some plans (e.g.,
projections/filters) may not yield a table. Return early if table_name is None to
prevent panics. Mirror this guard in all similar blocks.

src/service/search/datafusion/optimizer/physical_optimizer/remote_scan.rs [177-191]

 if !visitor.has_remote_scan {
-    let table_name = visitor.table_name.clone().unwrap();
-    let input = node.children()[0];
-    let remote_scan = Arc::new(RemoteScanExec::new(
-        input.clone(),
-        self.remote_scan_nodes.get_remote_node(&table_name),
-    )?);
-    let output_partitioning =
-        Partitioning::RoundRobinBatch(input.output_partitioning().partition_count());
-    let repartition =
-        Arc::new(RepartitionExec::try_new(remote_scan, output_partitioning)?);
-    let new_node = node.with_new_children(vec![repartition])?;
-    self.is_changed = true;
-    return Ok(Transformed::yes(new_node));
+    if let Some(table_name) = visitor.table_name.clone() {
+        let input = node.children()[0];
+        let remote_scan = Arc::new(RemoteScanExec::new(
+            input.clone(),
+            self.remote_scan_nodes.get_remote_node(&table_name),
+        )?);
+        let output_partitioning =
+            Partitioning::RoundRobinBatch(input.output_partitioning().partition_count());
+        let repartition =
+            Arc::new(RepartitionExec::try_new(remote_scan, output_partitioning)?);
+        let new_node = node.with_new_children(vec![repartition])?;
+        self.is_changed = true;
+        return Ok(Transformed::yes(new_node));
+    }
 }
Suggestion importance[1-10]: 7

__

Why: Replacing unwrap() with an if let Some(...) avoids potential panics when no table name is found and aligns with the new visitor semantics; it's a solid safety improvement though not necessarily critical.

Medium
Prevent panic on missing table

Avoid unwrap() on table_name to prevent panics when no table is detected at the top
level. Return the original plan if the table name is absent.

src/service/search/datafusion/optimizer/physical_optimizer/remote_scan.rs [291-299]

 if !visitor.has_remote_scan {
-    let table_name = visitor.table_name.clone().unwrap();
-    let remote_scan = Arc::new(RemoteScanExec::new(
-        plan,
-        remote_scan_nodes.get_remote_node(&table_name),
-    )?);
-    Ok(remote_scan)
+    if let Some(table_name) = visitor.table_name.clone() {
+        let remote_scan = Arc::new(RemoteScanExec::new(
+            plan,
+            remote_scan_nodes.get_remote_node(&table_name),
+        )?);
+        Ok(remote_scan)
+    } else {
+        Ok(plan)
+    }
 } else {
     Ok(plan)
 }
Suggestion importance[1-10]: 7

__

Why: Avoiding unwrap() in remote_scan_to_top_if_needed prevents a runtime panic when no table is detected; this is a pragmatic robustness fix consistent with the visitor's optional table_name.

Medium
Preserve child partitioning for joins

Preserve join partitioning to avoid breaking join distribution or causing skew. Use
the original child's output partitioning instead of forcing RoundRobin, or only add
repartition when partition_count > 1 is desired and consistent with join
requirements.

src/service/search/datafusion/optimizer/physical_optimizer/remote_scan.rs [247-271]

 } else if node.name() == "HashJoinExec" {
     let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
     for child in node.children() {
         let mut visitor = TableNameVisitor::new();
         child.visit(&mut visitor)?;
         if !visitor.has_remote_scan {
-            let table_name = visitor.table_name.clone().unwrap();
-            let remote_scan = Arc::new(RemoteScanExec::new(
-                child.clone(),
-                self.remote_scan_nodes.get_remote_node(&table_name),
-            )?);
-            // add repartition for better performance(imporve parallelism)
-            let output_partitioning = Partitioning::RoundRobinBatch(
-                child.output_partitioning().partition_count(),
-            );
-            let repartition =
-                Arc::new(RepartitionExec::try_new(remote_scan, output_partitioning)?);
-            new_children.push(repartition);
+            if let Some(table_name) = visitor.table_name.clone() {
+                let remote_scan = Arc::new(RemoteScanExec::new(
+                    child.clone(),
+                    self.remote_scan_nodes.get_remote_node(&table_name),
+                )?);
+                // Preserve original partitioning to avoid unintended shuffle changes
+                let output_partitioning = child.output_partitioning();
+                let repartition = Arc::new(RepartitionExec::try_new(
+                    remote_scan,
+                    output_partitioning.clone(),
+                )?);
+                new_children.push(repartition);
+            } else {
+                // No table discovered; keep child unchanged
+                new_children.push(child.clone());
+            }
         } else {
             new_children.push(child.clone());
         }
     }
     let new_node = node.with_new_children(new_children)?;
     self.is_changed = true;
     return Ok(Transformed::yes(new_node));
 }
Suggestion importance[1-10]: 6

__

Why: The recommendation to avoid forcing RoundRobin in HashJoinExec children may preserve intended distribution, but correctness depends on broader planning assumptions; it's a reasonable performance/behavior suggestion with moderate impact.

Low

@hengfeiyang hengfeiyang merged commit 5b907c1 into main Sep 9, 2025
28 of 29 checks passed
@hengfeiyang hengfeiyang deleted the fix-join-behavior branch September 9, 2025 09:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

☢️ Bug Something isn't working Review effort 3/5

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants