Skip to content

Conversation

@taimingl
Copy link
Contributor

@taimingl taimingl commented Jul 8, 2024

stop creating null field when merging parquet files during compaction

Summary by CodeRabbit

  • New Features

    • Enhanced schema handling to support schema versions and evolution during file merging.
    • Added method to selectively retain fields in schemas.
  • Improvements

    • Simplified query optimization logic for better performance.
    • Updated Rust toolchain to a newer nightly version for improved development experience.
  • Refactor

    • Reformatted and consolidated import statements for better code organization.
  • Chores

    • Updated Rust toolchain configurations in CI workflows.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 8, 2024

Walkthrough

The recent updates primarily focus on refining the handling of schemas and metadata in various Rust modules. Key changes include improvements in schema version management and error handling during file merges, as well as enhancements to schema field filtering. Additionally, some functions' parameter lists were updated to incorporate metadata and schema details more effectively. The Rust toolchain was also updated to a newer nightly version, improving overall development consistency.

Changes

File Path Change Summary
src/config/src/utils/record_batch_ext.rs Updated concat_batches to pass schema and arrays directly instead of cloning schema.
src/job/files/parquet.rs Modified merge_parquet_files call by adding .metadata() to the schema parameter under logs condition.
src/service/compact/merge.rs Updated merge_files to handle schema versions and evolution with improved error handling and logging.
- Removed schema parameter; added metadata: HashMap<String, String> and shared_fields: HashSet<String>.
src/service/search/datafusion/exec.rs Simplified query optimization logic in convert_parquet_file by removing select_wildcard.
src/utils/schema_ext.rs Added retain method to SchemaExt trait to filter fields based on HashSet<String>.
src/handler/http/request/search/mod.rs Reformatted import statements for trace ID, search type, stream type, and cache usage functions.
src/handler/http/request/search/multi_streams.rs Consolidated import statements related to get_stream_type_from_request.
.github/workflows/unit-tests.yml Updated Rust toolchain setup from nightly-2024-03-02 to nightly-2024-07-08.
rust-toolchain.toml Changed Rust toolchain channel to nightly-2024-07-08.

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added the ☢️ Bug Something isn't working label Jul 8, 2024
@taimingl taimingl marked this pull request as ready for review July 8, 2024 05:22
@taimingl taimingl requested a review from hengfeiyang July 8, 2024 05:22
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Outside diff range and nitpick comments (11)
src/config/src/utils/record_batch_ext.rs (5)

Line range hint 529-529:
Consider optimizing schema cloning.

The schema is cloned multiple times. Consider optimizing to avoid redundant cloning.

let schema = Arc::make_mut(&mut schema);

Line range hint 32-32:
Optimize redundant lookups and checks.

Consider caching the result of record.as_object().unwrap() to avoid redundant lookups.

for record in data.iter() {
    let record = record.as_object().unwrap();
    // rest of the code
}

Line range hint 200-200:
Optimize redundant conversions and checks.

Consider caching the result of batch.column_by_name(field.name()).unwrap() to avoid redundant lookups.

let col = batch.column_by_name(field.name()).unwrap();
match field.data_type() {
    // rest of the code
}

Line range hint 529-529:
Consider optimizing schema cloning.

The schema is cloned multiple times. Consider optimizing to avoid redundant cloning.

let schema = Arc::make_mut(&mut schema);

Line range hint 589-589:
Consider optimizing redundant checks.

The function performs redundant checks for stream_type == StreamType::Logs. Consider optimizing to avoid redundant checks.

if stream_type == StreamType::Logs {
    merge_parquet_files(thread_id, tmp_dir.name(), schema.metadata().clone()).await
} else {
    merge_parquet_files_by_datafusion(tmp_dir.name(), stream_type, &stream_name, schema.clone()).await
}
src/job/files/parquet.rs (6)

Line range hint 200-200:
Consider optimizing redundant checks.

The function performs redundant checks for db::compact::retention::is_deleting_stream. Consider optimizing to avoid redundant checks.

if db::compact::retention::is_deleting_stream(&org_id, stream_type, &stream_name, None) {
    // handle deletion
}

Line range hint 200-200:
Consider optimizing redundant checks.

The function performs redundant checks for cluster::is_offline. Consider optimizing to avoid redundant checks.

if cluster::is_offline() {
    break;
}

Line range hint 200-200:
Consider optimizing redundant checks.

The function performs redundant checks for PROCESSING_FILES.read().await.contains. Consider optimizing to avoid redundant checks.

if PROCESSING_FILES.read().await.contains(&file_key) {
    continue;
}

Line range hint 200-200:
Consider optimizing redundant checks.

The function performs redundant checks for cluster::is_offline. Consider optimizing to avoid redundant checks.

if cluster::is_offline() {
    break;
}

Line range hint 200-200:
Consider optimizing redundant checks.

The function performs redundant checks for schema_chk.has_fields. Consider optimizing to avoid redundant checks.

if !schema_chk.has_fields {
    // handle schema creation
}

[!TIP]
Codebase Verification

Insufficient error handling and logging in generate_index_on_ingester function.

The generate_index_on_ingester function uses unwrap, which can cause the program to panic if an error occurs. Additionally, there are no log::error or log::warn statements to log errors. It is recommended to replace unwrap with proper error handling and add error logging to improve robustness.

  • File: src/job/files/parquet.rs
  • Lines: Around 586-592
Analysis chain

Line range hint 200-200:
Verify error handling and logging.

Ensure that the error handling and logging within the index generation operation are sufficient.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify error handling and logging in generate_index_on_ingester function.

# Test: Check for error logs related to the index generation operation.
rg --type rust 'log::error!.*generate_index_on_ingester error'

Length of output: 64


Script:

#!/bin/bash
# Locate the generate_index_on_ingester function and check for error handling and logging.

# Step 1: Find the generate_index_on_ingester function definition.
ast-grep --lang rust --pattern 'fn generate_index_on_ingester($$_) { $$$ }'

# Step 2: Search for error handling (Result, Option, unwrap, expect) and logging within the function.
fd -e rs | xargs rg -A 5 -B 5 'fn generate_index_on_ingester' | rg 'Result|Option|unwrap|expect|log::error|log::warn'

Length of output: 255


Script:

#!/bin/bash
# Locate the generate_index_on_ingester function and check for error handling and logging.

# Step 1: Find the generate_index_on_ingester function definition and its body.
ast-grep --lang rust --pattern 'fn generate_index_on_ingester($$_) { $$$ }'

# Step 2: Search for error handling (Result, Option, unwrap, expect, ?) and logging within the function.
fd -e rs | xargs rg -A 20 -B 5 'fn generate_index_on_ingester' | rg 'log::error|log::warn|unwrap|expect|\?'

Length of output: 396

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 4bff0be and 23da131.

Files selected for processing (4)
  • src/config/src/utils/record_batch_ext.rs (1 hunks)
  • src/job/files/parquet.rs (1 hunks)
  • src/service/compact/merge.rs (6 hunks)
  • src/service/search/datafusion/exec.rs (1 hunks)
Files skipped from review due to trivial changes (1)
  • src/service/search/datafusion/exec.rs
Additional context used
Path-based instructions (3)
src/config/src/utils/record_batch_ext.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/job/files/parquet.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/merge.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

Additional comments not posted (14)
src/config/src/utils/record_batch_ext.rs (4)

511-511: LGTM! The change to pass schema and arrays directly to RecordBatch::try_new improves efficiency.

The change removes unnecessary cloning of the schema, which is a good improvement.


Line range hint 555-558:
Verify error handling and logging.

Ensure that the error handling and logging within the sorting operation are sufficient.


Line range hint 555-558:
Verify error handling and logging.

Ensure that the error handling and logging within the sorting operation are sufficient.

Verification successful

Error handling and logging verified.

The merge_record_batches function includes error handling and logging mechanisms. Specifically, it logs an error when the _timestamp column is not found and uses the Result type to propagate errors. This ensures that errors are appropriately managed and logged within the sorting operation.

  • log::error! call found at: src/config/src/utils/record_batch_ext.rs, around line 508.
  • Result handling is used to propagate errors.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify error handling and logging in merge_record_batches function.

# Test: Check for error logs related to the sorting operation.
rg --type rust 'log::error!.*merge small files failed to find _timestamp column'

Length of output: 82


Script:

#!/bin/bash
# Description: Verify error handling and logging in merge_record_batches function.

# Test: Check for all error logs, Result handling, and panic calls related to the sorting operation.
# 1. Search for any log::error! calls within the function.
rg --type rust 'log::error!' src/config/src/utils/record_batch_ext.rs

# 2. Search for any Result handling within the function.
rg --type rust 'Result<' src/config/src/utils/record_batch_ext.rs

# 3. Search for any panic! calls within the function.
rg --type rust 'panic!' src/config/src/utils/record_batch_ext.rs

Length of output: 377


Script:

#!/bin/bash
# Description: Extract relevant code snippets around error handling and logging in merge_record_batches function.

# Extract 10 lines before and after each log::error! call.
rg --type rust 'log::error!' -A 10 -B 10 src/config/src/utils/record_batch_ext.rs

# Extract the merge_record_batches function definition and its surrounding 20 lines.
rg --type rust 'fn merge_record_batches' -A 20 -B 20 src/config/src/utils/record_batch_ext.rs

Length of output: 2422


Line range hint 589-589:
Verify error handling and logging.

Ensure that the error handling and logging within the merge operation are sufficient.

src/job/files/parquet.rs (5)

589-589: Verify error handling and logging.

Ensure that the error handling and logging within the merge operation are sufficient.

Verification successful

Error handling and logging are sufficient.

The code around the merge_parquet_files function includes appropriate error handling and logging, capturing detailed error messages and relevant context.

  • Instances of log::error! are used to log errors with detailed messages.
  • Errors are properly propagated using Err.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify error handling and logging in merge_files function.

# Test: Check for error logs related to the merge operation.
rg --type rust 'log::error!.*merge_parquet_files error for stream'

Length of output: 68


Script:

#!/bin/bash
# Find all instances of the merge_parquet_files function usage and display surrounding lines for context.
rg 'merge_parquet_files' -A 5 -B 5 --type rust

Length of output: 10086


Line range hint 200-200:
Verify error handling and logging.

Ensure that the error handling and logging within the run operation are sufficient.


Line range hint 200-200:
Verify error handling and logging.

Ensure that the error handling and logging within the file preparation operation are sufficient.


Line range hint 200-200:
Verify error handling and logging.

Ensure that the error handling and logging within the file moving operation are sufficient.

Verification successful

Error handling and logging are sufficiently implemented.

The src/job/files/parquet.rs file contains multiple instances of error handling and logging using log::error!, log::warn!, and Result. The specific snippet (file.to_str().unwrap().replace('\\', "/")) does not show direct error handling, but the overall context indicates that error handling is addressed elsewhere in the function.

  • Various instances of log::error!, log::warn!, and Result are present in the file.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify error handling and logging in move_files function.

# Test: Check for error logs related to the file moving operation.
rg --type rust 'log::error!.*Failed to remove parquet file from disk'

Length of output: 71


Script:

#!/bin/bash
# Description: Extract the move_files function to inspect error handling and logging mechanisms.

# Locate the move_files function definition and its content.
ast-grep --lang rust --pattern 'fn move_files($_) {
  $$$
}'

Length of output: 62


Script:

#!/bin/bash
# Description: List all function definitions in the src/job/files/parquet.rs file to inspect for error handling and logging.

# List all function definitions in the specified file.
ast-grep --lang rust --pattern 'fn $_($_) {
  $$$
}' src/job/files/parquet.rs

Length of output: 79


Script:

#!/bin/bash
# Description: Search for error handling and logging within the src/job/files/parquet.rs file.

# Search for logging macros and Result usage in the specified file.
rg --type rust 'log::error!|log::warn!|log::info!|Result' src/job/files/parquet.rs

Length of output: 1663


Line range hint 200-200:
Verify error handling and logging.

Ensure that the error handling and logging within the file scanning operation are sufficient.

Verification successful

Error handling and logging are sufficient.

The error handling and logging within the scan_wal_files function are comprehensive. Errors are logged with descriptive messages at various stages of the operation.

  • src/job/files/parquet.rs: Verified error handling and logging in scan_wal_files function.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify error handling and logging in scan_wal_files function.

# Test: Check for error logs related to the file scanning operation.
rg --type rust 'log::error!.*Failed to scan files'

Length of output: 148


Script:

#!/bin/bash
# Extract the scan_wal_files function from src/job/files/parquet.rs to review error handling and logging.
ast-grep --lang rust --pattern 'fn scan_wal_files($$$_) {
  $$$
}' --debug

Length of output: 294


Script:

#!/bin/bash
# Extract the scan_wal_files function from src/job/files/parquet.rs to review error handling and logging.
ast-grep --lang rust --pattern 'fn scan_wal_files($$$_) {
  $$$
}' --debug-query

Length of output: 154


Script:

#!/bin/bash
# Locate and extract the scan_wal_files function from src/job/files/parquet.rs to review error handling and logging.
rg -A 20 -B 5 'fn scan_wal_files' src/job/files/parquet.rs

Length of output: 849


Script:

#!/bin/bash
# Extract the full scan_wal_files function from src/job/files/parquet.rs to review error handling and logging.
rg -A 50 -B 5 'async fn scan_wal_files' src/job/files/parquet.rs

Length of output: 2071

src/service/compact/merge.rs (5)

615-625: Ensure non-zero record count before proceeding.

The check for new_file_meta.records == 0 is essential to avoid errors during merging. This validation is correctly placed.


645-647: Pass metadata directly to merge_parquet_files.

The merge_parquet_files function now requires metadata. Ensure that the metadata is correctly passed.


649-726: Handle schema evolution and conversion.

The code block for handling schema evolution and converting files to the latest schema version is well-structured. Ensure that the schema differences are correctly calculated and files are properly converted.


1072-1072: Pass metadata as a parameter.

The metadata parameter is added to the function signature, which is necessary for handling schema metadata.


Line range hint 1086-1112:
Construct schema with metadata.

The schema is constructed using the shared fields and metadata. Ensure that the metadata is correctly incorporated.

@hengfeiyang hengfeiyang force-pushed the fix/compactor-schema branch from 23da131 to 45adf44 Compare July 8, 2024 08:10
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 23da131 and 8f36150.

Files selected for processing (4)
  • src/config/src/utils/schema_ext.rs (2 hunks)
  • src/handler/http/request/search/mod.rs (1 hunks)
  • src/handler/http/request/search/multi_streams.rs (1 hunks)
  • src/service/compact/merge.rs (7 hunks)
Files skipped from review due to trivial changes (2)
  • src/handler/http/request/search/mod.rs
  • src/handler/http/request/search/multi_streams.rs
Additional context used
Path-based instructions (2)
src/config/src/utils/schema_ext.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/merge.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

Additional comments not posted (5)
src/config/src/utils/schema_ext.rs (1)

30-30: LGTM! The retain method is well-implemented.

The addition of the retain method to the SchemaExt trait is clear and concise. It effectively filters the schema fields based on the provided HashSet.

Also applies to: 59-68

src/service/compact/merge.rs (4)

600-601: Ensure accurate record count adjustment.

The code correctly adjusts the total_records when a file is deleted. This is crucial for maintaining accurate metadata.


618-629: Good handling of schema evolution.

The code correctly initializes FileMeta and ensures that records are not zero before proceeding. This prevents potential issues during file merging.


Line range hint 1089-1099:
Efficient handling of shared fields.

The code efficiently tracks shared fields across multiple files, which is essential for creating a consistent schema.


1112-1114: Correct schema retention logic.

The code correctly retains only the shared fields in the schema, ensuring that the merged file schema is consistent with the input files.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 8f36150 and e4f7bb4.

Files selected for processing (2)
  • .github/workflows/unit-tests.yml (1 hunks)
  • rust-toolchain.toml (1 hunks)
Files skipped from review due to trivial changes (2)
  • .github/workflows/unit-tests.yml
  • rust-toolchain.toml

@hengfeiyang hengfeiyang merged commit 842dbdb into main Jul 8, 2024
@hengfeiyang hengfeiyang deleted the fix/compactor-schema branch July 8, 2024 09:43
hengfeiyang added a commit that referenced this pull request Jul 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

☢️ Bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants