Skip to content

Conversation

@haohuaijin
Copy link
Collaborator

@haohuaijin haohuaijin commented Sep 2, 2025

The currently printed scan_stats are accumulated as the node returns, but the scan_stats of a node itself should be output
this change only affect log output

@github-actions
Copy link
Contributor

github-actions bot commented Sep 2, 2025

PR Reviewer Guide 🔍

(Review updated until commit 20ff213)

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Ownership/Copy

let scan_stats = self.scan_stats; moves the struct field unless ScanStats is Copy (or Clone explicitly invoked). In Drop, moving fields can cause compile errors or unintended behavior. Validate that ScanStats implements Copy or adjust to borrow or clone as appropriate.

let scan_stats = self.scan_stats;
Concurrency

process_custom_message now mutably updates a local scan_stats and also locks and updates query_context.scan_stats. Ensure concurrent access patterns are safe and that double-accounting is intended. Consider documenting the distinction and ensuring no race when Drop logs local stats while global is updated elsewhere.

pub fn process_custom_message(&mut self, message: CustomMessage) {
    match message {
        CustomMessage::ScanStats(stats) => {
            self.scan_stats.add(&stats);
            self.query_context.scan_stats.lock().add(&stats);
        }

@github-actions github-actions bot added the ☢️ Bug Something isn't working label Sep 2, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Sep 2, 2025

PR Code Suggestions ✨

Latest suggestions up to 20ff213
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle poisoned locks safely

Guard against potential panics or deadlocks by handling lock poisoning when updating
query_context shared state. Use lock().ok() or try_lock() and early-return on
failure to ensure process_custom_message remains resilient under concurrency stress.

src/service/search/datafusion/distributed_plan/decoder_stream.rs [59-67]

 pub fn process_custom_message(&mut self, message: CustomMessage) {
     match message {
         CustomMessage::ScanStats(stats) => {
             self.scan_stats.add(&stats);
-            self.query_context.scan_stats.lock().add(&stats);
+            if let Ok(mut s) = self.query_context.scan_stats.lock() {
+                s.add(&stats);
+            }
         }
         CustomMessage::Metrics(metrics) => {
-            self.query_context.cluster_metrics.lock().extend(metrics);
+            if let Ok(mut m) = self.query_context.cluster_metrics.lock() {
+                m.extend(metrics);
+            }
         }
         CustomMessage::SetExecutedSegmentIds(ids) => {
-            self.query_context
-                .executed_segment_ids
-                .lock()
-                .extend(ids.into_iter().collect::<Vec<_>>());
+            if let Ok(mut es) = self.query_context.executed_segment_ids.lock() {
+                es.extend(ids.into_iter().collect::<Vec<_>>());
+            }
         }
         _ => {}
     }
 }
Suggestion importance[1-10]: 6

__

Why: The suggested guarded locking is a reasonable resiliency improvement and aligns with the new mutable method signature and fields; however, it changes error-handling semantics and may silently drop updates, making it a moderate-impact style/robustness change rather than a critical fix.

Low
General
Prevent ownership move in drop

Avoid moving self.scan_stats by value in drop, which can hinder future uses or
complicate logging macros expecting Copy. Clone or copy the stats explicitly to
prevent ownership issues and ensure the struct remains intact during drop logging.

src/service/search/datafusion/distributed_plan/decoder_stream.rs [140]

 impl Drop for FlightDecoderStream {
     fn drop(&mut self) {
         let QueryContext {
             trace_id,
             node,
             is_super,
             is_querier,
             start,
             num_rows,
             ..
         } = &self.query_context;
 
-        let search_role = if *is_super {
-            "leader".to_string()
-        } else {
-            "follower".to_string()
-        };
-        let scan_stats = self.scan_stats;
+        let search_role = if *is_super { "leader".to_string() } else { "follower".to_string() };
+        let scan_stats = self.scan_stats.clone();
 
         log::info!(
             "{}",
             search_inspector_fields(
Suggestion importance[1-10]: 3

__

Why: The code assigns let scan_stats = self.scan_stats; which likely copies if ScanStats is Copy; recommending clone() assumes a Clone impl and may be unnecessary or incorrect. Impact is minor and the concern about ownership in drop is not clearly substantiated by the diff.

Low

Previous suggestions

Suggestions up to commit 20ff213
CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle mutex locking safely

Guard against potential panics or deadlocks by minimizing the lock hold time and
handling poisoned locks. Store the result of lock() into a local with explicit
scoping and use ok() to skip updating shared stats if the mutex is poisoned.

src/service/search/datafusion/distributed_plan/decoder_stream.rs [59-67]

 pub fn process_custom_message(&mut self, message: CustomMessage) {
     match message {
         CustomMessage::ScanStats(stats) => {
+            // Update local accumulator first
             self.scan_stats.add(&stats);
-            self.query_context.scan_stats.lock().add(&stats);
+            // Update shared context with minimal lock scope and safe handling
+            if let Ok(mut shared) = self.query_context.scan_stats.lock() {
+                shared.add(&stats);
+            }
         }
         CustomMessage::Metrics(metrics) => {
-            self.query_context.cluster_metrics.lock().extend(metrics);
+            if let Ok(mut shared) = self.query_context.cluster_metrics.lock() {
+                shared.extend(metrics);
+            }
         }
-        ...
+        _ => {}
     }
 }
Suggestion importance[1-10]: 6

__

Why: Suggestion correctly targets the new process_custom_message method and improves robustness by handling potential poisoned locks and minimizing lock scope; impact is moderate and accurate to the diff.

Low
General
Prevent ownership move issues

Avoid potential double-move or use-after-drop issues by cloning or copying only what
is needed before logging, and ensure scan_stats is not moved if the struct might be
referenced later in logging. Prefer referencing or cloning lightweight fields
instead of moving ownership.

src/service/search/datafusion/distributed_plan/decoder_stream.rs [123-143]

 impl Drop for FlightDecoderStream {
     fn drop(&mut self) {
-        let QueryContext {
-            trace_id,
-            node,
-            is_super,
-            is_querier,
-            start,
-            num_rows,
-            ..
-        } = &self.query_context;
-        ...
-        let search_role = if *is_super {
-            "leader".to_string()
-        } else {
-            "follower".to_string()
-        };
-        let scan_stats = self.scan_stats;
+        let trace_id = self.query_context.trace_id.clone();
+        let node = self.query_context.node.clone();
+        let is_super = self.query_context.is_super;
+        let is_querier = self.query_context.is_querier;
+        let start = self.query_context.start;
+        let num_rows = self.query_context.num_rows;
+
+        let search_role = if is_super { "leader" } else { "follower" };
+        let scan_stats = self.scan_stats; // Copy if Copy; else clone if needed
 
         log::info!(
             "{}",
             search_inspector_fields(
Suggestion importance[1-10]: 3

__

Why: The concern about moves is low-risk here; fields are accessed by reference and scan_stats appears Copy-like in usage. The proposed change is mostly stylistic and partly speculative about ownership semantics.

Low

@haohuaijin haohuaijin changed the title fix: response node return full scan_stats fix: scan_stats for each node info Sep 2, 2025
@haohuaijin haohuaijin marked this pull request as ready for review September 2, 2025 10:17
@github-actions
Copy link
Contributor

github-actions bot commented Sep 2, 2025

Persistent review updated to latest commit 20ff213

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR fixes an issue with scan statistics reporting in distributed search scenarios by introducing proper per-node tracking in the FlightDecoderStream implementation. The core problem was that individual nodes were not accurately tracking and reporting their own scan statistics, relying instead on shared scan_stats from QueryContext which could lead to incomplete or inaccurate per-node metrics.

The solution introduces a local scan_stats field to each FlightDecoderStream instance, initialized with default values. When processing custom messages containing scan statistics, the code now performs dual accumulation - updating both the local instance statistics and the shared context statistics. This dual approach ensures that:

  1. Each node maintains accurate local statistics for its own operations
  2. The global aggregation across all nodes continues to work correctly
  3. The Drop implementation can access precise per-node statistics for logging purposes

The changes are minimal but strategic, affecting the struct initialization, message processing logic, and the cleanup phase. This modification aligns with the distributed architecture where individual nodes need to track their own performance metrics while contributing to system-wide aggregation. The fix ensures that monitoring and observability features work correctly at both the individual node level and cluster level, which is crucial for debugging and performance analysis in distributed search operations.

Confidence score: 4/5

  • This PR is safe to merge with minimal risk as it introduces local state tracking without breaking existing functionality
  • Score reflects straightforward logic changes that improve statistics accuracy without modifying core search behavior
  • Pay close attention to the dual accumulation pattern to ensure both local and shared statistics are properly maintained

1 file reviewed, no comments

Edit Code Review Bot Settings | Greptile

@haohuaijin haohuaijin merged commit 95439dc into main Sep 2, 2025
31 of 32 checks passed
@haohuaijin haohuaijin deleted the fix-single-node-scan_stats branch September 2, 2025 11:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

☢️ Bug Something isn't working Review effort 2/5

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants