Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

@hengfeiyang hengfeiyang commented Nov 1, 2024

Use streaming to write parquet files instead of obtaining the entire RecordBatch and writing to parquet, significantly reducing memory usage on the Compactor.

For example we set ZO_COMPACT_MAX_FILE_SIZE = 10240, The version v0.13.0 merge file need 10GB memory, But it only need 500MB with this PR.

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced asynchronous functions for flexible file retrieval and byte range access.
    • Added support for batch operations in file management.
    • Enhanced logging and error handling across various components.
  • Bug Fixes

    • Improved error handling in search request processing and database interactions.
  • Documentation

    • Clarified documentation for notification methods and email subject assignments.
  • Chores

    • Minor adjustments to logging messages for clarity and consistency.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 1, 2024

Walkthrough

The pull request introduces new asynchronous functions for file retrieval and data processing in the caching and storage modules. Specifically, a get function is added to retrieve files from memory, disk, or storage with optional byte range support. Additionally, a get_range function is introduced for retrieving specific byte ranges from files in storage. Enhancements in file processing, merging, and error handling are made in various modules, including improved logging and restructuring of existing functions, particularly in parquet file handling and search functionalities.

Changes

File Path Change Summary
src/infra/src/cache/file_data/mod.rs - Added pub async fn get(file: &str, range: Option<Range<usize>>) -> Result<bytes::Bytes, anyhow::Error> for file retrieval.
- Imported Range from std::ops.
src/infra/src/storage/mod.rs - Added pub async fn get_range(file: &str, range: Range<usize>) -> Result<bytes::Bytes, anyhow::Error> for byte range retrieval.
- Imported Range from std::ops.
src/job/files/parquet.rs - Replaced merge_parquet_files_by_datafusion with merge_parquet_files in merge_files.
- Enhanced error handling in move_files.
- Updated prepare_files for file processing checks.
- Updated method signatures for clarity.
src/service/compact/merge.rs - Updated pub async fn merge_files(...) to return Result<(String, FileMeta, Vec<FileKey>), anyhow::Error>.
- Added async fn cache_remote_files(files: &[FileKey]).
- Added fn generate_schema_diff(...).
src/service/search/cluster/flight.rs - Added logging for node counts in search.
- Updated check_work_group for conditional compilation.
- Enhanced logging in get_inverted_index_file_lists.
- Updated function signatures with logging additions.
src/service/search/datafusion/exec.rs - Removed pub async fn convert_parquet_file(...).
- Updated pub async fn merge_parquet_files(...) parameters and return type.
src/config/src/meta/inverted_index/mod.rs - Removed pub struct IndexFileMetas and associated methods.
src/config/src/meta/inverted_index/reader.rs - Added new methods and updated IndexReader and FieldReader for enhanced metadata handling.
src/service/search/grpc/storage.rs - Improved logging and error handling in the search function and related components.
src/service/db/file_list/remote.rs - Adjusted logging format in the cache function for improved readability.

Possibly related PRs

Suggested reviewers

  • haohuaijin
  • Subhra264
  • oasisk

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@hengfeiyang hengfeiyang marked this pull request as draft November 1, 2024 16:32
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Outside diff range and nitpick comments (3)
src/infra/src/storage/mod.rs (1)

83-86: Add documentation and improve error handling

As this is a public API, consider these improvements:

  1. Add rustdoc documentation with examples
  2. Wrap errors with more context using .context()
  3. Consider validating range values

Here's a suggested improvement:

+/// Retrieves a specific byte range from a file in storage.
+///
+/// # Arguments
+/// * `file` - The path to the file
+/// * `range` - The byte range to retrieve
+///
+/// # Examples
+/// ```
+/// let data = storage::get_range("path/to/file", 0..100).await?;
+/// ```
 pub async fn get_range(file: &str, range: Range<usize>) -> Result<bytes::Bytes, anyhow::Error> {
-    let data = DEFAULT.get_range(&file.into(), range).await?;
+    let data = DEFAULT
+        .get_range(&file.into(), range)
+        .await
+        .with_context(|| format!("Failed to get range from file: {}", file))?;
     Ok(data)
 }
src/service/search/cluster/flight.rs (1)

Line range hint 590-614: Consider optimizing node indexing in partition_file_by_hash.

The current implementation uses string-based HashMap lookups in a potentially hot path. Consider using integer IDs or a more efficient indexing mechanism to improve performance.

     let mut node_idx = HashMap::with_capacity(nodes.len());
     let mut idx = 0;
     for node in nodes {
         if !node.is_querier() {
             continue;
         }
-        node_idx.insert(&node.name, idx);
+        node_idx.insert(node.id, idx);  // Use node.id instead of node.name
         idx += 1;
     }
     let mut partitions = vec![Vec::new(); idx];
     for fk in file_id_list {
         let node_name =
             infra_cluster::get_node_from_consistent_hash(&fk.id.to_string(), &Role::Querier, group)
                 .await
                 .expect("there is no querier node in consistent hash ring");
-        let idx = match node_idx.get(&node_name) {
+        let node_id = nodes.iter()
+            .find(|n| n.name == node_name)
+            .map(|n| n.id)
+            .expect("node not found");
+        let idx = match node_idx.get(&node_id) {
             Some(idx) => *idx,
             None => {
                 log::error!(
                     "partition_file_by_hash: {} not found in node_idx",
                     node_name
                 );
                 0
             }
         };
         partitions[idx].push(fk.id);
     }
src/service/compact/merge.rs (1)

649-650: Avoid variable shadowing of new_file_size.

The variable new_file_size is redefined here after being previously used to accumulate file sizes before caching. This could lead to confusion or potential bugs. Consider renaming the variable to something like total_original_size for clarity.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 4603b26 and 01220c1.

📒 Files selected for processing (6)
  • src/infra/src/cache/file_data/mod.rs (2 hunks)
  • src/infra/src/storage/mod.rs (2 hunks)
  • src/job/files/parquet.rs (1 hunks)
  • src/service/compact/merge.rs (7 hunks)
  • src/service/search/cluster/flight.rs (1 hunks)
  • src/service/search/datafusion/exec.rs (5 hunks)
🧰 Additional context used
📓 Path-based instructions (6)
src/infra/src/cache/file_data/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/storage/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/job/files/parquet.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/merge.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/search/cluster/flight.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/search/datafusion/exec.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

🔇 Additional comments (17)
src/infra/src/storage/mod.rs (1)

16-16: LGTM: Clean import addition

The Range import is properly placed and necessary for the new functionality.

src/infra/src/cache/file_data/mod.rs (1)

19-19: LGTM!

The Range import is correctly placed and necessary for the new range-based retrieval functionality.

src/service/search/cluster/flight.rs (1)

137-141: LGTM: Logging enhancement improves observability.

The added logging statement provides valuable operational insights into node availability and distribution.

src/service/search/datafusion/exec.rs (13)

29-29: Updated imports to include necessary DataFusion modules

The added imports for DataType, Schema, and DataFusion modules ensure that all required types and traits are available for the updated functions.

Also applies to: 33-33


47-47: Importing execute_stream for streaming execution

Including physical_plan::execute_stream enables efficient streaming execution of the physical plan, which can improve performance during data processing.


50-50: Added TryStreamExt for asynchronous stream handling

Importing futures::TryStreamExt provides the try_next method used for asynchronously iterating over the stream of record batches.


61-61: Imported NewUnionTable for table registration

Including NewUnionTable allows for the combination of multiple table providers into a single logical table, which is utilized in the merge_parquet_files function.


72-75: Refactored merge_parquet_files function signature

The updated function signature now accepts tables, bloom_filter_fields, and metadata as parameters, improving flexibility and modularity in how tables are merged.


Line range hint 80-94: Enhanced SQL query construction for data retrieval

The SQL query logic is refined to handle different stream_type conditions, ensuring that data is accurately retrieved and sorted based on the timestamp column.


101-104: Prepared DataFusion context with updated parameters

Setting sort_by_timestamp_desc and target_partitions before preparing the DataFusion context optimizes query execution according to the configuration.


106-107: Utilizing NewUnionTable for efficient data merging

Creating a union table from the provided tables simplifies data access and allows for efficient querying across multiple data sources.


109-112: Creating and executing logical and physical plans

The logical and physical plans are correctly generated from the SQL query, and the schema is obtained from the physical plan, ensuring accurate data processing.


114-115: Initializing Parquet writer with correct schema and metadata

The Parquet writer is properly initialized with the buffer, schema, bloom filter fields, and metadata, preparing it for writing the merged data.


116-134: Efficiently writing data to Parquet file in a streaming manner

The loop processes the data stream asynchronously, writing each batch to the Parquet file and handling errors appropriately. This approach optimizes memory usage and performance.


134-134: Closing the Parquet writer properly

Calling writer.close().await?; ensures that all data is flushed to the buffer and resources are released, which is essential for data integrity.


144-144: Returning the merged schema and data buffer

The function returns the schema and the buffer containing the merged Parquet data as expected, completing the data merging operation.

src/job/files/parquet.rs (1)

717-717: Verify all references to merge_parquet_files_by_datafusion have been updated

With the replacement of merge_parquet_files_by_datafusion with merge_parquet_files at line 717, please ensure that all other references to the old function have been updated accordingly in the codebase.

Run the following script to verify:

✅ Verification successful

No remaining references to the old function found

The verification shows that there are no references to merge_parquet_files_by_datafusion in the codebase, and all occurrences are using the new merge_parquet_files function. The function is properly defined in src/service/compact/merge.rs and is being used consistently across the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for any remaining references to `merge_parquet_files_by_datafusion` in the codebase.

# Test: Use ripgrep to find any occurrences. Expect: No matches found.
rg 'merge_parquet_files_by_datafusion'

Length of output: 38


Script:

#!/bin/bash
# Let's verify both the old and new function names to understand the changes better
# First, check for the old function definition and usage
ast-grep --pattern 'fn merge_parquet_files_by_datafusion'

# Then check for the new function definition and usage
ast-grep --pattern 'fn merge_parquet_files'

# Also search with ripgrep for any string occurrences including in comments and tests
rg -A 2 'merge_parquet_files'

Length of output: 3563

@hengfeiyang hengfeiyang marked this pull request as ready for review November 7, 2024 08:45
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Outside diff range and nitpick comments (31)
deploy/build/buildspec-tag-arm64-pi.yml (1)

19-21: Consider improving feature management in build scripts.

The build script currently uses multiple sed commands to modify features. Consider creating a dedicated configuration file or using build arguments to manage feature flags more maintainably.

Example approach:

      # disable gxhash
-     - sed -i 's/default = \[\"gxhash\"\]/default = []/g' src/config/Cargo.toml
-     - sed -i 's/+aes,//g' .cargo/config.toml
+     # Create a features.sh script
+     - |
+       cat > scripts/configure_features.sh << 'EOF'
+       #!/bin/bash
+       # Disable specific features for ARM64 Pi builds
+       features_to_disable=(
+         "s/default = \[\"gxhash\"\]/default = []/g:src/config/Cargo.toml"
+         "s/+aes,//g:.cargo/config.toml"
+       )
+       for feature in "${features_to_disable[@]}"; do
+         IFS=: read -r pattern file <<< "$feature"
+         sed -i "$pattern" "$file"
+       done
+       EOF
+     - chmod +x scripts/configure_features.sh
+     - ./scripts/configure_features.sh
🧰 Tools
🪛 yamllint

[error] 21-21: trailing spaces

(trailing-spaces)

src/config/src/meta/inverted_index/search.rs (1)

Line range hint 111-129: Consider adding error handling for bitmap size mismatches.

While the changes effectively implement the streaming approach, consider adding explicit error handling for cases where bitmap sizes might not match expectations. This could prevent potential runtime issues.

 let bitmap = index_reader.get_bitmap(value).await?;
 
 // Resize if the res map is smaller than the bitmap
 if res.len() < bitmap.len() {
+    // Consider adding a size sanity check here
+    if bitmap.len() > column_index_meta.max_bitmap_size {
+        return Err(anyhow::anyhow!("Bitmap size exceeds maximum allowed size"));
+    }
     res.resize(bitmap.len(), false);
 }
src/config/src/meta/inverted_index/writer.rs (2)

58-58: Document the rationale for bitmap size increment.

The change from segment_id + 1 to segment_id + 64 appears to be a performance optimization to reduce bitmap resizing frequency. However, the choice of 64 as the increment value should be documented.

Consider adding a comment explaining why 64 was chosen as the growth factor:

-            bitmap.resize(segment_id + 64, false);
+            // Pre-allocate space for 64 additional segments to reduce resize operations
+            bitmap.resize(segment_id + 64, false);

94-100: Consider pre-allocating writer capacity.

The implementation correctly serializes and writes metadata. However, since we know we'll be writing both the metadata and its size, consider pre-allocating the required capacity to avoid potential reallocations.

         // write meta into writer buffer
+        writer.reserve(std::mem::size_of::<u32>() + meta_bytes.len());
         let meta_bytes = serde_json::to_vec(&self.meta)?;
         writer.write_all(&meta_bytes)?;
         let metas_size = meta_bytes.len() as u32;
         writer.write_all(&metas_size.to_le_bytes())?;
         writer.flush()?;
src/config/src/utils/parquet.rs (2)

121-139: Add documentation for the new streaming reader functions

The implementation looks good, but these public functions would benefit from documentation comments explaining:

  • Their purpose in the streaming context
  • The difference from their non-streaming counterparts
  • Memory usage benefits

Add rustdoc comments like this:

/// Returns a streaming reader for parquet data from bytes, optimized for memory efficiency.
/// 
/// This function enables processing large parquet files without loading the entire dataset
/// into memory at once.
pub async fn get_recordbatch_reader_from_bytes(

141-154: Add performance considerations to documentation

The implementation correctly provides backward compatibility while enabling streaming capabilities. Consider adding documentation that helps users choose between streaming (get_recordbatch_reader_*) and batch collection (read_recordbatch_*) based on their memory constraints.

Add a note about memory usage:

/// Collects all record batches into memory. For large files, consider using
/// `get_recordbatch_reader_from_bytes` instead to process data in a streaming fashion.
pub async fn read_recordbatch_from_bytes(
src/cli/basic/cli.rs (2)

294-294: Fix grammatical error in success message.

The message should use past tense for better grammar.

-    println!("command {name} execute successfully");
+    println!("command {name} executed successfully");

Line range hint 266-270: Consider enhancing error handling consistency.

The error handling could be improved in two ways:

  1. Add file existence validation before attempting deletion
  2. Use consistent error message formatting across all commands (prefer using format! macro)

Example improvement:

match file_list::delete_parquet_file(file, true).await {
    Ok(_) => {
        println!("delete parquet file {} successfully", file);
    }
    Err(e) => {
        println!("{}", format!("delete parquet file {} failed: {}", file, e));
    }
}
src/job/files/idx.rs (1)

Line range hint 1-329: LGTM: Successful implementation of streaming parquet writing.

The changes effectively implement streaming parquet writing, which should significantly reduce memory usage during compaction. The code structure is clean and maintainable.

Consider adding metrics to monitor:

  1. Memory usage during compaction
  2. Time taken for index file generation
  3. Success/failure rates of file uploads

This will help validate the performance improvements and identify potential bottlenecks.

src/service/dashboards/reports.rs (3)

382-382: Consider validating the email subject.

While the simplified subject assignment is good, consider adding validation to ensure the title is not empty before using it as the email subject. This would prevent emails with empty subjects which might affect deliverability or user experience.

-            .subject(self.title.to_string());
+            .subject(if self.title.is_empty() {
+                "Dashboard Report".to_string()
+            } else {
+                self.title.to_string()
+            });

Line range hint 516-516: Replace fixed sleep durations with proper wait conditions.

The current implementation uses fixed sleep durations which could be unreliable across different environments and load conditions. Consider using proper wait conditions or exponential backoff instead.

-    tokio::time::sleep(Duration::from_secs(5)).await;
+    // Wait for the dashboard to be fully loaded
+    page.wait_for_element("div#dashboard-container").await?;
+    page.wait_for_function("() => !document.querySelector('.loading-indicator')").await?;

-    tokio::time::sleep(Duration::from_secs(2)).await;
+    // Wait for organization switch to complete
+    page.wait_for_function("() => window.location.href.includes(arguments[0])", vec![org_id.into()]).await?;

Also applies to: 595-595


Line range hint 489-499: Improve browser automation error handling.

Consider implementing proper cleanup using Drop trait and providing more specific error messages during page navigation failures.

struct BrowserCleanup {
    browser: Browser,
    handle: tokio::task::JoinHandle<()>,
}

impl Drop for BrowserCleanup {
    fn drop(&mut self) {
        let browser = std::mem::replace(&mut self.browser, unsafe { std::mem::zeroed() });
        let handle = std::mem::replace(&mut self.handle, unsafe { std::mem::zeroed() });
        
        tokio::spawn(async move {
            if let Err(e) = browser.close().await {
                log::error!("Failed to close browser: {}", e);
            }
            if let Err(e) = handle.await {
                log::error!("Failed to join browser handler: {}", e);
            }
        });
    }
}
src/service/search/grpc/storage.rs (2)

Line range hint 519-541: Consider enhancing error context and optimizing string clone

  1. The error handling at line 540 could provide more context about the task failure.
  2. The trace_id string clone at line 519 could be avoided by using a reference.

Consider this improvement:

-        let trace_id = query.trace_id.to_string();
+        let trace_id = &query.trace_id;
         let permit = semaphore.clone().acquire_owned().await.unwrap();
         // Spawn a task for each file, wherein full text search and
         // secondary index search queries are executed
         let task = tokio::task::spawn(async move {

And enhance the error context:

-        .map_err(|e| Error::ErrorCode(ErrorCodes::ServerInternalError(e.to_string())))?;
+        .map_err(|e| Error::ErrorCode(ErrorCodes::ServerInternalError(
+            format!("Failed to join inverted index search tasks: {}", e)
+        )))?;

632-647: Consider improving error handling and search type constants

  1. Field access error handling could be more descriptive.
  2. Search type strings could be defined as constants.

Consider these improvements:

  1. Enhance field access error handling:
-    if let Some(mut field_reader) = index_reader.field(INDEX_FIELD_NAME_FOR_ALL).await? {
+    let mut field_reader = index_reader.field(INDEX_FIELD_NAME_FOR_ALL).await?
+        .ok_or_else(|| anyhow::anyhow!("Field {} not found in index", INDEX_FIELD_NAME_FOR_ALL))?;
  1. Define search type constants:
const SEARCH_TYPE_EXACT: &str = "eq";
const SEARCH_TYPE_CONTAINS: &str = "contains";
const SEARCH_TYPE_PREFIX: &str = "prefix";

Also applies to: 670-673

src/service/compact/merge.rs (5)

433-434: Consider optimizing the FileSize strategy implementation.

The FileSize strategy's early break could lead to suboptimal merging when files are sorted by size. Consider accumulating files until reaching the optimal chunk size instead of breaking early.

-if job_strategy == MergeStrategy::FileSize {
-    break;
-}
-new_file_size = 0;
-new_file_list.clear();
-continue; // this batch don't need to merge, skip
+// Continue accumulating files even if we exceed the size limit
+// This ensures we get closer to the optimal chunk size
+new_file_list.push(file.clone());
+new_file_size += file.meta.original_size;

Also applies to: 456-461


657-658: Consider using fold for efficient record counting.

The separate mapping operations for records and file size could be combined into a single fold operation for better performance.

-let total_records = new_file_list.iter().map(|f| f.meta.records).sum();
-let new_file_size = new_file_list.iter().map(|f| f.meta.original_size).sum();
+let (total_records, new_file_size) = new_file_list.iter().fold((0, 0), |(records, size), f| {
+    (records + f.meta.records, size + f.meta.original_size)
+});

Also applies to: 668-668


1195-1211: Add input validation for schema diff generation.

Consider validating input parameters and handling edge cases.

 fn generate_schema_diff(
     schema: &Schema,
     schema_latest_map: &HashMap<&String, &Arc<Field>>,
 ) -> Result<HashMap<String, DataType>, anyhow::Error> {
+    if schema.fields().is_empty() {
+        return Err(anyhow::anyhow!("Empty schema provided"));
+    }
+
     let mut diff_fields = HashMap::new();
     // ... rest of the implementation

798-868: Improve error handling in FST index generation.

Consider adding more detailed error context when FST generation fails.

-generate_fst_inverted_index(
+generate_fst_inverted_index(
     &new_file_key,
     &full_text_search_fields,
     &index_fields,
     Some(&retain_file_list),
     schema,
     &mut reader,
-)
-.await?;
+)
+.await
+.map_err(|e| anyhow::anyhow!("Failed to generate FST index for {}: {}", new_file_key, e))?;

Line range hint 1113-1129: Optimize RecordBatch concatenation.

Consider pre-allocating the capacity for the concatenated batch to improve performance.

 let new_batch = if inverted_idx_batches.len() == 1 {
     inverted_idx_batches.remove(0)
 } else {
     let new_schema = inverted_idx_batches.first().unwrap().schema();
+    // Pre-calculate total rows for better memory allocation
+    let total_rows = inverted_idx_batches.iter().map(|b| b.num_rows()).sum();
+    // TODO: Add capacity hint to concat_batches function
     concat_batches(new_schema, inverted_idx_batches).map_err(anyhow::Error::from)?
 };
src/job/files/parquet.rs (3)

696-705: Consider enhancing error handling in merge operation.

The error handling could be improved by adding context information to the error chain. This would help in debugging issues when the merge operation fails.

-    let merge_result = merge_parquet_files_by_datafusion(
+    let merge_result = merge_parquet_files_by_datafusion(
         stream_type,
         &stream_name,
         schema,
         tables,
         &bloom_filter_fields,
         &new_file_meta,
-    ).await;
+    ).await.with_context(|| format!(
+        "Failed to merge parquet files for stream: {}/{}", 
+        stream_type, stream_name
+    ));

1147-1177: Consider optimizing term processing for better performance.

The current implementation processes terms sequentially. Consider using parallel processing for large datasets:

-            let terms = (0..num_rows)
-                .flat_map(|i| {
-                    split_token(column_data.value(i), &cfg.common.inverted_index_split_chars)
-                        .into_iter()
-                        .map(|s| (s, i))
-                        .collect::<Vec<_>>()
-                })
-                .collect::<Vec<_>>();
+            let terms = (0..num_rows)
+                .into_par_iter()
+                .flat_map(|i| {
+                    split_token(column_data.value(i), &cfg.common.inverted_index_split_chars)
+                        .into_par_iter()
+                        .map(|s| (s, i))
+                        .collect::<Vec<_>>()
+                })
+                .collect::<Vec<_>>();

This would require adding rayon as a dependency for parallel iteration support.


1494-1506: Consider using a pre-allocated buffer for better memory efficiency.

The current implementation creates new buffers for each column. Consider pre-allocating a reusable buffer:

+    let mut reusable_buf = Vec::with_capacity(1024 * 1024); // Pre-allocate 1MB
     for (column_name, indexer) in indexers {
         if indexer.is_empty() {
             continue;
         }
-        let mut buf = Vec::new();
+        reusable_buf.clear(); // Reuse the buffer
         let _index_meta = indexer.write(&mut buf).context(format!(
             "Error constructing FST ColumnIndex for field {}",
             column_name
         ))?;
         original_size += buf.len();
-        puffin_writer.add_blob(column_name, buf)?;
+        puffin_writer.add_blob(column_name, reusable_buf.clone())?;
     }
src/handler/http/request/search/mod.rs (1)

150-169: Consider adding unit tests for the parameter setting logic.

The new parameter handling logic would benefit from unit tests to verify:

  1. Parameters are not overwritten when already set
  2. Error handling works as expected
  3. Empty/None cases are handled correctly

Would you like me to help create unit tests for these scenarios?

src/config/src/config.rs (1)

1590-1590: Improve error message clarity.

The error message could be more descriptive by including:

  1. The current invalid value
  2. The purpose of this configuration

Consider updating the error message to be more informative:

-            "ZO_INVERTED_INDEX_STORE_FORMAT must be one of both, parquet."
+            "Invalid ZO_INVERTED_INDEX_STORE_FORMAT value. This setting controls how inverted indices are stored. Allowed values are: 'both' or 'parquet' (default)."
src/config/src/meta/puffin/writer.rs (2)

171-174: Typographical error: variable file_metdadata should be file_metadata

There is a typo in the variable name file_metdadata. Correcting it to file_metadata will improve code readability and prevent potential confusion.

Apply this diff to correct the typo:

     fn get_payload(&mut self) -> Result<Vec<u8>> {
-        let file_metdadata = PuffinMeta {
+        let file_metadata = PuffinMeta {
             blobs: mem::take(&mut self.blob_metadata),
             properties: mem::take(&mut self.file_properties),
         };
-        serde_json::to_vec(&file_metdadata).context("Error serializing puffin metadata")
+        serde_json::to_vec(&file_metadata).context("Error serializing puffin metadata")
     }

171-174: Inconsistent naming of blob metadata fields across structs

There is inconsistency in the naming of blob metadata fields:

  • In PuffinBytesWriter, the field is blobs_metadata.
  • In PuffinFooterWriter, it's blob_metadata.
  • In PuffinMeta, it's blobs.

For consistency and improved readability, consider standardizing the field names across these structs.

src/common/utils/stream.rs (1)

92-101: Optimize min and max computation using Arrow compute functions

Manually iterating over each value to find min and max can be inefficient. Utilize Arrow's compute::min and compute::max functions for better performance.

Apply this diff to use compute functions:

+        use arrow::compute::{min as compute_min, max as compute_max};
         // ...
-        for i in 0..num_row {
-            if min_col.is_valid(i) {
-                let val = min_col.value(i);
-                if val < min_val {
-                    min_val = val;
-                }
-            }
-            if max_col.is_valid(i) {
-                let val = max_col.value(i);
-                if val > max_val {
-                    max_val = val;
-                }
-            }
-        }
+        if let Some(batch_min) = compute_min(min_col)? {
+            if batch_min < min_val {
+                min_val = batch_min;
+            }
+        }
+        if let Some(batch_max) = compute_max(max_col)? {
+            if batch_max > max_val {
+                max_val = batch_max;
+            }
+        }
src/config/src/meta/puffin/reader.rs (1)

222-227: Validate blob offsets without assuming continuity.

In the validate_payload function, the code assumes that each blob.offset is equal to the cumulative offset, which may not hold if blobs are not stored sequentially without gaps. This could lead to false positives when validating blob offsets. Consider verifying that each blob's offset and length are within the valid range without assuming continuity.

Consider modifying the validation logic as follows:

             for blob in &puffin_metadata.blobs {
-                ensure!(
-                    blob.offset == offset,
-                    anyhow!("Blob payload offset mismatch")
-                );
-                offset += blob.length;
+                ensure!(
+                    blob.offset + blob.length <= self.head_magic_offset(),
+                    anyhow!("Blob payload exceeds file boundaries")
+                );
             }
src/config/src/meta/search.rs (1)

Line range hint 711-743: Consider using the strum crate to derive Display and FromStr implementations

The manual implementations of the Display and FromStr traits for SearchEventType can be simplified by leveraging the strum crate's Display and EnumString derives. This reduces boilerplate code and improves maintainability.

Apply this diff to implement the change:

+use strum_macros::{Display, EnumString};

 #[derive(Hash, Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
-#[serde(rename_all = "lowercase")]
+#[derive(Display, EnumString)]
+#[strum(serialize_all = "lowercase")]
 pub enum SearchEventType {
     UI,
     Dashboards,
     Reports,
     Alerts,
-    Values,
+    #[strum(serialize = "_values", serialize = "values")]
     Values,
     Other,
     RUM,
-    DerivedStream,
+    #[strum(serialize = "derived_stream", serialize = "derivedstream")]
+    DerivedStream,
 }
 
-impl std::fmt::Display for SearchEventType {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        match self {
-            SearchEventType::UI => write!(f, "ui"),
-            SearchEventType::Dashboards => write!(f, "dashboards"),
-            SearchEventType::Reports => write!(f, "reports"),
-            SearchEventType::Alerts => write!(f, "alerts"),
-            SearchEventType::Values => write!(f, "_values"),
-            SearchEventType::Other => write!(f, "other"),
-            SearchEventType::RUM => write!(f, "rum"),
-            SearchEventType::DerivedStream => write!(f, "derived_stream"),
-        }
-    }
-}
-
-impl FromStr for SearchEventType {
-    type Err = String;
-    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
-        let s = s.to_lowercase();
-        match s.as_str() {
-            "ui" => Ok(SearchEventType::UI),
-            "dashboards" => Ok(SearchEventType::Dashboards),
-            "reports" => Ok(SearchEventType::Reports),
-            "alerts" => Ok(SearchEventType::Alerts),
-            "values" | "_values" => Ok(SearchEventType::Values),
-            "other" => Ok(SearchEventType::Other),
-            "rum" => Ok(SearchEventType::RUM),
-            "derived_stream" | "derivedstream" => Ok(SearchEventType::DerivedStream),
-            _ => Err(format!("Invalid search event type: {s}")),
-        }
-    }
-}
src/infra/src/file_list/postgres.rs (2)

Line range hint 82-84: Implement or remove the batch_add_with_id method

The batch_add_with_id method is currently unimplemented. If this method is intended for future use, please implement it. Otherwise, consider removing it to maintain code clarity and avoid potential confusion.


1520-1520: Adjust log level for successful operation

The log message indicates a successful operation but uses log::warn!. Consider changing it to log::info! to reflect the successful nature of the action.

Apply this diff to adjust the log level:

-            log::warn!("[POSTGRES] create table index(file_list_stream_file_idx) successfully");
+            log::info!("[POSTGRES] create table index(file_list_stream_file_idx) successfully");
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 01220c1 and 0f7fdda.

📒 Files selected for processing (30)
  • deploy/build/buildspec-tag-arm64-pi.yml (1 hunks)
  • src/cli/basic/cli.rs (3 hunks)
  • src/common/utils/stream.rs (6 hunks)
  • src/config/src/config.rs (1 hunks)
  • src/config/src/meta/inverted_index/mod.rs (0 hunks)
  • src/config/src/meta/inverted_index/reader.rs (3 hunks)
  • src/config/src/meta/inverted_index/search.rs (6 hunks)
  • src/config/src/meta/inverted_index/writer.rs (3 hunks)
  • src/config/src/meta/puffin/mod.rs (4 hunks)
  • src/config/src/meta/puffin/reader.rs (4 hunks)
  • src/config/src/meta/puffin/writer.rs (4 hunks)
  • src/config/src/meta/search.rs (1 hunks)
  • src/config/src/utils/inverted_index.rs (2 hunks)
  • src/config/src/utils/parquet.rs (2 hunks)
  • src/handler/http/request/search/mod.rs (1 hunks)
  • src/infra/src/file_list/mysql.rs (1 hunks)
  • src/infra/src/file_list/postgres.rs (1 hunks)
  • src/infra/src/file_list/sqlite.rs (1 hunks)
  • src/job/file_list.rs (1 hunks)
  • src/job/files/idx.rs (4 hunks)
  • src/job/files/parquet.rs (15 hunks)
  • src/report_server/src/report.rs (1 hunks)
  • src/service/alerts/alert.rs (2 hunks)
  • src/service/compact/file_list.rs (1 hunks)
  • src/service/compact/merge.rs (11 hunks)
  • src/service/dashboards/reports.rs (1 hunks)
  • src/service/db/file_list/remote.rs (1 hunks)
  • src/service/search/cluster/flight.rs (1 hunks)
  • src/service/search/grpc/storage.rs (6 hunks)
  • src/service/search/sql.rs (1 hunks)
💤 Files with no reviewable changes (1)
  • src/config/src/meta/inverted_index/mod.rs
✅ Files skipped from review due to trivial changes (6)
  • src/infra/src/file_list/mysql.rs
  • src/infra/src/file_list/sqlite.rs
  • src/job/file_list.rs
  • src/service/compact/file_list.rs
  • src/service/db/file_list/remote.rs
  • src/service/search/sql.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/service/search/cluster/flight.rs
🧰 Additional context used
📓 Path-based instructions (21)
src/cli/basic/cli.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/common/utils/stream.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/config.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/meta/inverted_index/reader.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/meta/inverted_index/search.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/meta/inverted_index/writer.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/meta/puffin/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/meta/puffin/reader.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/meta/puffin/writer.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/meta/search.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/utils/inverted_index.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/utils/parquet.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/handler/http/request/search/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/file_list/postgres.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/job/files/idx.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/job/files/parquet.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/report_server/src/report.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/alerts/alert.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/merge.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/dashboards/reports.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/search/grpc/storage.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

🔇 Additional comments (35)
deploy/build/buildspec-tag-arm64-pi.yml (1)

20-20: Verify the impact of removing AES CPU feature.

While removing the AES CPU feature might be necessary for compatibility with Pi devices, this could affect cryptographic performance. Please ensure this change has been tested on target Pi devices.

✅ Verification successful

Removing AES CPU feature is safe for ARM64 Pi builds

Based on the codebase analysis, removing the AES CPU feature from .cargo/config.toml is safe because:

  1. No direct cryptographic implementations using hardware AES acceleration were found in the Rust code
  2. The project's cryptographic operations are handled through high-level dependencies (like argon2, jsonwebtoken) which automatically fall back to software implementations when hardware features are unavailable
  3. This change only affects the ARM64 Pi build target, maintaining compatibility with devices that may not have AES hardware acceleration
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any cryptographic implementations that might be affected
# by the removal of AES CPU feature

# Look for potential crypto/AES usage in the codebase
rg -l 'aes|crypto|cipher' --type rust

# Check if there are any relevant configuration or feature flags in Cargo.toml files
fd Cargo.toml --exec grep -l 'aes\|crypto\|cipher' {}

Length of output: 97


Script:

#!/bin/bash
# Let's try a broader search to understand the context better

# 1. First, let's see what's in the .cargo/config.toml to understand the AES config
fd -g '.cargo/config.toml' --exec cat {}

# 2. Check for any dependencies in Cargo.toml that might use crypto features
fd Cargo.toml --exec cat {}

# 3. Look for any encryption-related code with broader terms
rg -l 'encrypt|decrypt|cipher|hash|crypto' --type-not lock

Length of output: 19705

src/config/src/meta/inverted_index/search.rs (4)

9-9: LGTM: Import change aligns with refactoring.

The import change correctly reflects the transition to using FieldReader.


89-96: LGTM: Consistent implementation with other search changes.

The changes align with the overall refactoring pattern.

#!/bin/bash
# Verify that all calls to ExactSearch::search have been updated
rg -p "ExactSearch.*?search" --type rust

54-61: LGTM: Consistent implementation with SubstringSearch changes.

The changes maintain functional parity while simplifying the interface.

#!/bin/bash
# Verify that all calls to PrefixSearch::search have been updated
rg -p "PrefixSearch.*?search" --type rust

26-33: LGTM: Simplified method signature improves code clarity.

The transition to FieldReader removes unnecessary generic constraints while maintaining functionality. This change supports the PR's goal of implementing a streaming approach.

✅ Verification successful

LGTM: Verified that all SubstringSearch usage is compatible with the changes

The search results confirm that:

  • The only caller of SubstringSearch::search is in src/service/search/grpc/storage.rs and it's already using FieldReader
  • The implementation in src/config/src/meta/inverted_index/search.rs is consistent across all search types (ExactSearch, PrefixSearch, SubstringSearch)
  • No other usages of SubstringSearch::search were found that might be incompatible with the changes
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify that all calls to SubstringSearch::search have been updated
rg -p "SubstringSearch.*?search" --type rust

Length of output: 194


Script:

#!/bin/bash
# Let's try a broader search to find all usages of search method in the codebase
rg -p "search.*?FieldReader" --type rust -A 3 -B 3

# Also search for SubstringSearch struct definition and implementations
ast-grep --pattern 'impl $_ for SubstringSearch'

# And direct usage of SubstringSearch
rg "SubstringSearch" --type rust -A 3 -B 3

Length of output: 2762

src/config/src/meta/inverted_index/writer.rs (2)

78-79: LGTM! Efficient ownership transfer using std::mem::take.

Good optimization using take to avoid unnecessary cloning while iterating over the sorter.


88-90: Verify the necessity of resetting index_size to 0.

The index_size is reset to 0 before calculating offsets. This seems suspicious as it might affect the relative offset calculations. Please verify if this is intentional or if it should maintain its previous value.

✅ Verification successful

Let me gather more context about the usage of index_size in the writer implementation.


Let me try a different approach to get the context around the writer implementation.


Resetting index_size to 0 is correct and necessary

The index_size reset is intentional and correct. Here's why:

  • The index_size is used during the bitmap writing phase in append_value() to track offsets for each bitmap (lines 117-124)
  • After all bitmaps are written and before FST is written, resetting to 0 is correct because:
    • The bitmap writing phase is complete (all offsets are stored in FST)
    • The FST and metadata are written after this point
    • The stored offsets in FST remain valid as they were calculated during bitmap writing
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other usages of index_size to understand the impact
rg -p "index_size" --type rust

Length of output: 362


Script:

#!/bin/bash
# Get the full implementation context of the writer
ast-grep --pattern 'impl$_ InvertedIndexWriter {
  $$$
}'

Length of output: 59


Script:

#!/bin/bash
# Get more context around the index_size usage in writer.rs
rg -p -B 10 -A 10 "index_size" src/config/src/meta/inverted_index/writer.rs

Length of output: 1862

src/config/src/utils/parquet.rs (1)

27-30: LGTM: Import changes align with new streaming functionality

The new imports support the transition to streaming parquet data processing, which is essential for reducing memory usage.

src/config/src/utils/inverted_index.rs (3)

69-71: Memory optimization looks good

The simplified implementation aligns well with the PR's objective of reducing memory usage during compaction. Direct buffer usage without intermediate steps is more efficient.


69-71: Verify if buffer validation is handled elsewhere

The previous implementation likely included validation of the buffer contents. Please confirm that this validation is now handled either:

  1. Inside IndexReader::new
  2. At a higher level before calling this function
#!/bin/bash
# Look for validation in IndexReader implementation
ast-grep --pattern 'impl IndexReader {
  $$$
  fn new($$$) {
    $$$
  }
  $$$
}'

18-25: Verify error handling after removing anyhow macros

The removal of anyhow macros while keeping anyhow::Result suggests a simplified error handling approach. Please ensure that all error cases are still properly handled and propagated.

✅ Verification successful

Error handling is properly maintained after macro removal

The code shows a well-maintained error handling approach:

  • The create_index_reader_from_puffin_bytes function still returns Result<IndexReader<...>> and handles errors appropriately
  • The function has been simplified to a direct constructor call that propagates errors through Ok()
  • No complex error handling was needed after the simplification, making the anyhow macros removal justified
  • Other functions in the file use proper error handling through Option types and explicit returns
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any remaining anyhow error handling patterns
rg -A 2 'anyhow!|ensure!' src/config/src/utils/

Length of output: 83


Script:

#!/bin/bash
# Let's check the error handling patterns in the inverted_index.rs file
rg -A 5 'Result<' src/config/src/utils/inverted_index.rs

# Also check for any error propagation using '?'
rg -A 2 '\?' src/config/src/utils/inverted_index.rs

# Look for the implementation of create_index_reader_from_puffin_bytes
ast-grep --pattern 'fn create_index_reader_from_puffin_bytes($$$) -> Result<$$$> {
  $$$
}'

Length of output: 709


Script:

#!/bin/bash
# Let's look at the complete implementation of the file to understand the error handling
cat src/config/src/utils/inverted_index.rs

# Also check for any other files that might be using this module to ensure error handling is consistent
rg -l "use.*inverted_index" src/config/src/

Length of output: 10475

src/cli/basic/cli.rs (2)

135-135: LGTM! Clear and consistent message format.

The success message follows good CLI output conventions and clearly indicates the operation result.


266-266: LGTM! Well-structured success message with proper error handling.

The success message is consistent with the CLI's message format and is paired with appropriate error handling.

src/job/files/idx.rs (3)

27-29: LGTM: Import changes align with streaming parquet implementation.

The new imports support the refactored parquet writing functionality and enhanced record batch operations.


74-74: LGTM: Improved logging clarity with specific context.

The "[JOB:IDX]" prefix helps in better identifying index-related operations in logs.

Also applies to: 126-129, 133-133


100-105: Consider adding error handling for populate_file_meta.

While the FileMeta initialization is improved, the populate_file_meta call could benefit from more robust error handling. Consider handling the case where "min_ts" or "max_ts" columns don't exist.

-        populate_file_meta(&[&batch], &mut file_meta, Some("min_ts"), Some("max_ts")).await?;
+        if let Err(e) = populate_file_meta(&[&batch], &mut file_meta, Some("min_ts"), Some("max_ts")).await {
+            log::warn!("[JOB:IDX] Failed to populate timestamp metadata: {}", e);
+            // Continue without timestamp metadata
+        }
src/report_server/src/report.rs (1)

391-391: LGTM: Simplified string conversion

The change from format! to to_string() is more idiomatic and efficient for simple string conversion.

src/service/search/grpc/storage.rs (2)

138-138: LGTM: Improved log message formatting

The addition of a space before "ms" enhances log readability.


554-563: LGTM: Appropriate use of debug logging

The debug log messages provide useful information for troubleshooting while maintaining appropriate log levels.

src/service/alerts/alert.rs (2)

Line range hint 358-364: LGTM! Documentation improvement

The updated documentation clearly describes the return type and its components.


559-559: LGTM! Code simplification

The simplified email subject assignment is more idiomatic and maintains the same functionality.

src/service/compact/merge.rs (1)

1131-1193: LGTM with existing review comments.

The implementation looks good, noting that there's an existing review comment about handling semaphore permit acquisition errors.

src/job/files/parquet.rs (2)

792-799: LGTM! Well-structured index generation functions.

The index generation functions are well-organized with consistent parameter handling and proper error management.

Also applies to: 964-971


Line range hint 1-1517: Overall implementation looks solid and achieves the PR objectives.

The changes successfully implement a streaming approach for writing parquet files, which significantly reduces memory usage during compaction. The code is well-structured with proper error handling and logging. The suggested optimizations can be considered for future improvements.

src/handler/http/request/search/mod.rs (2)

150-161: LGTM: Improved error handling and conditional setting of search parameters.

The changes enhance robustness by:

  1. Only setting values if they are None
  2. Using proper error handling with match expressions
  3. Preserving existing values when present

164-169: LGTM: Consistent error handling for index_type.

The implementation follows the same robust pattern of conditional setting and error handling used for other parameters.

src/config/src/meta/puffin/mod.rs (5)

36-36: Addition of COMPRESSED flag is appropriate.

Defining the COMPRESSED flag in PuffinFooterFlags correctly expands the available footer flags and is implemented properly.


44-44: Renaming blob_metadata to blobs enhances clarity.

Updating the field name to blobs improves readability and aligns with common naming conventions.


58-58: Use of BlobTypes enum for blob_type improves type safety.

Changing blob_type to the BlobTypes enum enhances type safety and ensures only valid blob types are used.


94-103: Introduction of BlobTypes enum enhances serialization and maintainability.

Defining BlobTypes with appropriate variants and serde attributes ensures correct serialization/deserialization and improves code maintainability.


62-76: Ensure changing integer types to unsigned doesn't introduce issues.

Switching fields, snapshot_id, sequence_number, offset, and length to unsigned integers (u32, u64) prevents negative values, which is appropriate if these values should always be non-negative. Please verify this change doesn't affect existing data or logic that might have used negative values.

Run the following script to check for any negative assignments or comparisons:

✅ Verification successful

Based on the comprehensive search results, I can now provide a final response:

The change to unsigned integers is appropriate and safe.

The codebase analysis shows that:

  1. No negative assignments or comparisons were found for these fields
  2. The fields are used in ways consistent with unsigned values:
    • offset is used for file positions and timestamps
    • length represents sizes of data blocks
    • fields, snapshot_id, and sequence_number are used as counters/identifiers
  3. The arithmetic operations found (offset + length, offset + hour_micros(1), etc.) are all additive or multiplicative, with no need for negative values

The change aligns with the semantic meaning of these fields in the Puffin format, where they represent:

  • File offsets (must be non-negative)
  • Data lengths (must be non-negative)
  • Sequence numbers (naturally non-negative)
  • Field counts (naturally non-negative)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for negative assignments or comparisons involving the affected fields.

# Search for negative assignments
rg --type rust --pcre2 '(fields|snapshot_id|sequence_number|offset|length)\s*=\s*-\d+'

# Search for negative comparisons
rg --type rust --pcre2 '(fields|snapshot_id|sequence_number|offset|length)\s*[<>=!]=\s*-'

Length of output: 180


Script:

#!/bin/bash
# Let's perform a more comprehensive analysis:

# 1. Find all usages of these fields in the codebase
rg -A 2 -B 2 '\b(fields|snapshot_id|sequence_number|offset|length)\b' --type rust

# 2. Look for type declarations or casts involving these fields
ast-grep --pattern 'let $name: i32 = $value' --lang rust
ast-grep --pattern 'let $name: i64 = $value' --lang rust

# 3. Look for arithmetic operations that might assume signed integers
rg '\b(fields|snapshot_id|sequence_number|offset|length)\s*[-+*/]\s*' --type rust

# 4. Check for any test files that might reveal expected behavior
fd -e rs test -e spec -e tests --exec grep -l 'fields\|snapshot_id\|sequence_number\|offset\|length' {}

Length of output: 207545

src/config/src/meta/puffin/writer.rs (1)

161-161: Verify that PuffinFooterFlags::DEFAULT accurately reflects compression settings

The footer flags are set to PuffinFooterFlags::DEFAULT:

buf.extend_from_slice(&PuffinFooterFlags::DEFAULT.bits().to_le_bytes());

Please verify that this correctly represents the compression state of the blobs. If any blobs are compressed, the footer flags may need to be updated accordingly.

src/common/utils/stream.rs (2)

Line range hint 166-183: Test test_populate_file_meta is correctly implemented

The test accurately verifies populate_file_meta with default field names, ensuring proper functionality.


Line range hint 202-221: Test test_populate_file_meta_with_custom_field looks good

The test effectively checks populate_file_meta with custom min and max field names, confirming expected behavior.

src/config/src/meta/puffin/reader.rs (1)

83-86: Ensure callers handle the updated return type of get_metadata.

The return type of get_metadata has changed from Result<PuffinMeta> to Result<Option<PuffinMeta>>. This means that callers need to handle the Option case appropriately. Please verify that all usages of this method have been updated to accommodate the new return type to prevent potential None value handling issues.

Run the following script to find all usages of get_metadata:

@hengfeiyang hengfeiyang merged commit df0a4a7 into main Nov 7, 2024
29 checks passed
@hengfeiyang hengfeiyang deleted the perf/compactor branch November 7, 2024 09:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants