-
Notifications
You must be signed in to change notification settings - Fork 711
feat: refactor merge file on compactor #4971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request introduces new asynchronous functions for file retrieval and data processing in the caching and storage modules. Specifically, a Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
9268921 to
01220c1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Outside diff range and nitpick comments (3)
src/infra/src/storage/mod.rs (1)
83-86: Add documentation and improve error handlingAs this is a public API, consider these improvements:
- Add rustdoc documentation with examples
- Wrap errors with more context using
.context()- Consider validating range values
Here's a suggested improvement:
+/// Retrieves a specific byte range from a file in storage. +/// +/// # Arguments +/// * `file` - The path to the file +/// * `range` - The byte range to retrieve +/// +/// # Examples +/// ``` +/// let data = storage::get_range("path/to/file", 0..100).await?; +/// ``` pub async fn get_range(file: &str, range: Range<usize>) -> Result<bytes::Bytes, anyhow::Error> { - let data = DEFAULT.get_range(&file.into(), range).await?; + let data = DEFAULT + .get_range(&file.into(), range) + .await + .with_context(|| format!("Failed to get range from file: {}", file))?; Ok(data) }src/service/search/cluster/flight.rs (1)
Line range hint
590-614: Consider optimizing node indexing in partition_file_by_hash.The current implementation uses string-based HashMap lookups in a potentially hot path. Consider using integer IDs or a more efficient indexing mechanism to improve performance.
let mut node_idx = HashMap::with_capacity(nodes.len()); let mut idx = 0; for node in nodes { if !node.is_querier() { continue; } - node_idx.insert(&node.name, idx); + node_idx.insert(node.id, idx); // Use node.id instead of node.name idx += 1; } let mut partitions = vec![Vec::new(); idx]; for fk in file_id_list { let node_name = infra_cluster::get_node_from_consistent_hash(&fk.id.to_string(), &Role::Querier, group) .await .expect("there is no querier node in consistent hash ring"); - let idx = match node_idx.get(&node_name) { + let node_id = nodes.iter() + .find(|n| n.name == node_name) + .map(|n| n.id) + .expect("node not found"); + let idx = match node_idx.get(&node_id) { Some(idx) => *idx, None => { log::error!( "partition_file_by_hash: {} not found in node_idx", node_name ); 0 } }; partitions[idx].push(fk.id); }src/service/compact/merge.rs (1)
649-650: Avoid variable shadowing ofnew_file_size.The variable
new_file_sizeis redefined here after being previously used to accumulate file sizes before caching. This could lead to confusion or potential bugs. Consider renaming the variable to something liketotal_original_sizefor clarity.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
src/infra/src/cache/file_data/mod.rs(2 hunks)src/infra/src/storage/mod.rs(2 hunks)src/job/files/parquet.rs(1 hunks)src/service/compact/merge.rs(7 hunks)src/service/search/cluster/flight.rs(1 hunks)src/service/search/datafusion/exec.rs(5 hunks)
🧰 Additional context used
📓 Path-based instructions (6)
src/infra/src/cache/file_data/mod.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/infra/src/storage/mod.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/job/files/parquet.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/compact/merge.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/search/cluster/flight.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/search/datafusion/exec.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
🔇 Additional comments (17)
src/infra/src/storage/mod.rs (1)
16-16: LGTM: Clean import addition
The Range import is properly placed and necessary for the new functionality.
src/infra/src/cache/file_data/mod.rs (1)
19-19: LGTM!
The Range import is correctly placed and necessary for the new range-based retrieval functionality.
src/service/search/cluster/flight.rs (1)
137-141: LGTM: Logging enhancement improves observability.
The added logging statement provides valuable operational insights into node availability and distribution.
src/service/search/datafusion/exec.rs (13)
29-29: Updated imports to include necessary DataFusion modules
The added imports for DataType, Schema, and DataFusion modules ensure that all required types and traits are available for the updated functions.
Also applies to: 33-33
47-47: Importing execute_stream for streaming execution
Including physical_plan::execute_stream enables efficient streaming execution of the physical plan, which can improve performance during data processing.
50-50: Added TryStreamExt for asynchronous stream handling
Importing futures::TryStreamExt provides the try_next method used for asynchronously iterating over the stream of record batches.
61-61: Imported NewUnionTable for table registration
Including NewUnionTable allows for the combination of multiple table providers into a single logical table, which is utilized in the merge_parquet_files function.
72-75: Refactored merge_parquet_files function signature
The updated function signature now accepts tables, bloom_filter_fields, and metadata as parameters, improving flexibility and modularity in how tables are merged.
Line range hint 80-94: Enhanced SQL query construction for data retrieval
The SQL query logic is refined to handle different stream_type conditions, ensuring that data is accurately retrieved and sorted based on the timestamp column.
101-104: Prepared DataFusion context with updated parameters
Setting sort_by_timestamp_desc and target_partitions before preparing the DataFusion context optimizes query execution according to the configuration.
106-107: Utilizing NewUnionTable for efficient data merging
Creating a union table from the provided tables simplifies data access and allows for efficient querying across multiple data sources.
109-112: Creating and executing logical and physical plans
The logical and physical plans are correctly generated from the SQL query, and the schema is obtained from the physical plan, ensuring accurate data processing.
114-115: Initializing Parquet writer with correct schema and metadata
The Parquet writer is properly initialized with the buffer, schema, bloom filter fields, and metadata, preparing it for writing the merged data.
116-134: Efficiently writing data to Parquet file in a streaming manner
The loop processes the data stream asynchronously, writing each batch to the Parquet file and handling errors appropriately. This approach optimizes memory usage and performance.
134-134: Closing the Parquet writer properly
Calling writer.close().await?; ensures that all data is flushed to the buffer and resources are released, which is essential for data integrity.
144-144: Returning the merged schema and data buffer
The function returns the schema and the buffer containing the merged Parquet data as expected, completing the data merging operation.
src/job/files/parquet.rs (1)
717-717: Verify all references to merge_parquet_files_by_datafusion have been updated
With the replacement of merge_parquet_files_by_datafusion with merge_parquet_files at line 717, please ensure that all other references to the old function have been updated accordingly in the codebase.
Run the following script to verify:
✅ Verification successful
No remaining references to the old function found
The verification shows that there are no references to merge_parquet_files_by_datafusion in the codebase, and all occurrences are using the new merge_parquet_files function. The function is properly defined in src/service/compact/merge.rs and is being used consistently across the codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Search for any remaining references to `merge_parquet_files_by_datafusion` in the codebase.
# Test: Use ripgrep to find any occurrences. Expect: No matches found.
rg 'merge_parquet_files_by_datafusion'
Length of output: 38
Script:
#!/bin/bash
# Let's verify both the old and new function names to understand the changes better
# First, check for the old function definition and usage
ast-grep --pattern 'fn merge_parquet_files_by_datafusion'
# Then check for the new function definition and usage
ast-grep --pattern 'fn merge_parquet_files'
# Also search with ripgrep for any string occurrences including in comments and tests
rg -A 2 'merge_parquet_files'
Length of output: 3563
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🧹 Outside diff range and nitpick comments (31)
deploy/build/buildspec-tag-arm64-pi.yml (1)
19-21: Consider improving feature management in build scripts.The build script currently uses multiple sed commands to modify features. Consider creating a dedicated configuration file or using build arguments to manage feature flags more maintainably.
Example approach:
# disable gxhash - - sed -i 's/default = \[\"gxhash\"\]/default = []/g' src/config/Cargo.toml - - sed -i 's/+aes,//g' .cargo/config.toml + # Create a features.sh script + - | + cat > scripts/configure_features.sh << 'EOF' + #!/bin/bash + # Disable specific features for ARM64 Pi builds + features_to_disable=( + "s/default = \[\"gxhash\"\]/default = []/g:src/config/Cargo.toml" + "s/+aes,//g:.cargo/config.toml" + ) + for feature in "${features_to_disable[@]}"; do + IFS=: read -r pattern file <<< "$feature" + sed -i "$pattern" "$file" + done + EOF + - chmod +x scripts/configure_features.sh + - ./scripts/configure_features.sh🧰 Tools
🪛 yamllint
[error] 21-21: trailing spaces
(trailing-spaces)
src/config/src/meta/inverted_index/search.rs (1)
Line range hint
111-129: Consider adding error handling for bitmap size mismatches.While the changes effectively implement the streaming approach, consider adding explicit error handling for cases where bitmap sizes might not match expectations. This could prevent potential runtime issues.
let bitmap = index_reader.get_bitmap(value).await?; // Resize if the res map is smaller than the bitmap if res.len() < bitmap.len() { + // Consider adding a size sanity check here + if bitmap.len() > column_index_meta.max_bitmap_size { + return Err(anyhow::anyhow!("Bitmap size exceeds maximum allowed size")); + } res.resize(bitmap.len(), false); }src/config/src/meta/inverted_index/writer.rs (2)
58-58: Document the rationale for bitmap size increment.The change from
segment_id + 1tosegment_id + 64appears to be a performance optimization to reduce bitmap resizing frequency. However, the choice of 64 as the increment value should be documented.Consider adding a comment explaining why 64 was chosen as the growth factor:
- bitmap.resize(segment_id + 64, false); + // Pre-allocate space for 64 additional segments to reduce resize operations + bitmap.resize(segment_id + 64, false);
94-100: Consider pre-allocating writer capacity.The implementation correctly serializes and writes metadata. However, since we know we'll be writing both the metadata and its size, consider pre-allocating the required capacity to avoid potential reallocations.
// write meta into writer buffer + writer.reserve(std::mem::size_of::<u32>() + meta_bytes.len()); let meta_bytes = serde_json::to_vec(&self.meta)?; writer.write_all(&meta_bytes)?; let metas_size = meta_bytes.len() as u32; writer.write_all(&metas_size.to_le_bytes())?; writer.flush()?;src/config/src/utils/parquet.rs (2)
121-139: Add documentation for the new streaming reader functionsThe implementation looks good, but these public functions would benefit from documentation comments explaining:
- Their purpose in the streaming context
- The difference from their non-streaming counterparts
- Memory usage benefits
Add rustdoc comments like this:
/// Returns a streaming reader for parquet data from bytes, optimized for memory efficiency. /// /// This function enables processing large parquet files without loading the entire dataset /// into memory at once. pub async fn get_recordbatch_reader_from_bytes(
141-154: Add performance considerations to documentationThe implementation correctly provides backward compatibility while enabling streaming capabilities. Consider adding documentation that helps users choose between streaming (
get_recordbatch_reader_*) and batch collection (read_recordbatch_*) based on their memory constraints.Add a note about memory usage:
/// Collects all record batches into memory. For large files, consider using /// `get_recordbatch_reader_from_bytes` instead to process data in a streaming fashion. pub async fn read_recordbatch_from_bytes(src/cli/basic/cli.rs (2)
294-294: Fix grammatical error in success message.The message should use past tense for better grammar.
- println!("command {name} execute successfully"); + println!("command {name} executed successfully");
Line range hint
266-270: Consider enhancing error handling consistency.The error handling could be improved in two ways:
- Add file existence validation before attempting deletion
- Use consistent error message formatting across all commands (prefer using
format!macro)Example improvement:
match file_list::delete_parquet_file(file, true).await { Ok(_) => { println!("delete parquet file {} successfully", file); } Err(e) => { println!("{}", format!("delete parquet file {} failed: {}", file, e)); } }src/job/files/idx.rs (1)
Line range hint
1-329: LGTM: Successful implementation of streaming parquet writing.The changes effectively implement streaming parquet writing, which should significantly reduce memory usage during compaction. The code structure is clean and maintainable.
Consider adding metrics to monitor:
- Memory usage during compaction
- Time taken for index file generation
- Success/failure rates of file uploads
This will help validate the performance improvements and identify potential bottlenecks.
src/service/dashboards/reports.rs (3)
382-382: Consider validating the email subject.While the simplified subject assignment is good, consider adding validation to ensure the title is not empty before using it as the email subject. This would prevent emails with empty subjects which might affect deliverability or user experience.
- .subject(self.title.to_string()); + .subject(if self.title.is_empty() { + "Dashboard Report".to_string() + } else { + self.title.to_string() + });
Line range hint
516-516: Replace fixed sleep durations with proper wait conditions.The current implementation uses fixed sleep durations which could be unreliable across different environments and load conditions. Consider using proper wait conditions or exponential backoff instead.
- tokio::time::sleep(Duration::from_secs(5)).await; + // Wait for the dashboard to be fully loaded + page.wait_for_element("div#dashboard-container").await?; + page.wait_for_function("() => !document.querySelector('.loading-indicator')").await?; - tokio::time::sleep(Duration::from_secs(2)).await; + // Wait for organization switch to complete + page.wait_for_function("() => window.location.href.includes(arguments[0])", vec![org_id.into()]).await?;Also applies to: 595-595
Line range hint
489-499: Improve browser automation error handling.Consider implementing proper cleanup using
Droptrait and providing more specific error messages during page navigation failures.struct BrowserCleanup { browser: Browser, handle: tokio::task::JoinHandle<()>, } impl Drop for BrowserCleanup { fn drop(&mut self) { let browser = std::mem::replace(&mut self.browser, unsafe { std::mem::zeroed() }); let handle = std::mem::replace(&mut self.handle, unsafe { std::mem::zeroed() }); tokio::spawn(async move { if let Err(e) = browser.close().await { log::error!("Failed to close browser: {}", e); } if let Err(e) = handle.await { log::error!("Failed to join browser handler: {}", e); } }); } }src/service/search/grpc/storage.rs (2)
Line range hint
519-541: Consider enhancing error context and optimizing string clone
- The error handling at line 540 could provide more context about the task failure.
- The
trace_idstring clone at line 519 could be avoided by using a reference.Consider this improvement:
- let trace_id = query.trace_id.to_string(); + let trace_id = &query.trace_id; let permit = semaphore.clone().acquire_owned().await.unwrap(); // Spawn a task for each file, wherein full text search and // secondary index search queries are executed let task = tokio::task::spawn(async move {And enhance the error context:
- .map_err(|e| Error::ErrorCode(ErrorCodes::ServerInternalError(e.to_string())))?; + .map_err(|e| Error::ErrorCode(ErrorCodes::ServerInternalError( + format!("Failed to join inverted index search tasks: {}", e) + )))?;
632-647: Consider improving error handling and search type constants
- Field access error handling could be more descriptive.
- Search type strings could be defined as constants.
Consider these improvements:
- Enhance field access error handling:
- if let Some(mut field_reader) = index_reader.field(INDEX_FIELD_NAME_FOR_ALL).await? { + let mut field_reader = index_reader.field(INDEX_FIELD_NAME_FOR_ALL).await? + .ok_or_else(|| anyhow::anyhow!("Field {} not found in index", INDEX_FIELD_NAME_FOR_ALL))?;
- Define search type constants:
const SEARCH_TYPE_EXACT: &str = "eq"; const SEARCH_TYPE_CONTAINS: &str = "contains"; const SEARCH_TYPE_PREFIX: &str = "prefix";Also applies to: 670-673
src/service/compact/merge.rs (5)
433-434: Consider optimizing the FileSize strategy implementation.The FileSize strategy's early break could lead to suboptimal merging when files are sorted by size. Consider accumulating files until reaching the optimal chunk size instead of breaking early.
-if job_strategy == MergeStrategy::FileSize { - break; -} -new_file_size = 0; -new_file_list.clear(); -continue; // this batch don't need to merge, skip +// Continue accumulating files even if we exceed the size limit +// This ensures we get closer to the optimal chunk size +new_file_list.push(file.clone()); +new_file_size += file.meta.original_size;Also applies to: 456-461
657-658: Consider using fold for efficient record counting.The separate mapping operations for records and file size could be combined into a single fold operation for better performance.
-let total_records = new_file_list.iter().map(|f| f.meta.records).sum(); -let new_file_size = new_file_list.iter().map(|f| f.meta.original_size).sum(); +let (total_records, new_file_size) = new_file_list.iter().fold((0, 0), |(records, size), f| { + (records + f.meta.records, size + f.meta.original_size) +});Also applies to: 668-668
1195-1211: Add input validation for schema diff generation.Consider validating input parameters and handling edge cases.
fn generate_schema_diff( schema: &Schema, schema_latest_map: &HashMap<&String, &Arc<Field>>, ) -> Result<HashMap<String, DataType>, anyhow::Error> { + if schema.fields().is_empty() { + return Err(anyhow::anyhow!("Empty schema provided")); + } + let mut diff_fields = HashMap::new(); // ... rest of the implementation
798-868: Improve error handling in FST index generation.Consider adding more detailed error context when FST generation fails.
-generate_fst_inverted_index( +generate_fst_inverted_index( &new_file_key, &full_text_search_fields, &index_fields, Some(&retain_file_list), schema, &mut reader, -) -.await?; +) +.await +.map_err(|e| anyhow::anyhow!("Failed to generate FST index for {}: {}", new_file_key, e))?;
Line range hint
1113-1129: Optimize RecordBatch concatenation.Consider pre-allocating the capacity for the concatenated batch to improve performance.
let new_batch = if inverted_idx_batches.len() == 1 { inverted_idx_batches.remove(0) } else { let new_schema = inverted_idx_batches.first().unwrap().schema(); + // Pre-calculate total rows for better memory allocation + let total_rows = inverted_idx_batches.iter().map(|b| b.num_rows()).sum(); + // TODO: Add capacity hint to concat_batches function concat_batches(new_schema, inverted_idx_batches).map_err(anyhow::Error::from)? };src/job/files/parquet.rs (3)
696-705: Consider enhancing error handling in merge operation.The error handling could be improved by adding context information to the error chain. This would help in debugging issues when the merge operation fails.
- let merge_result = merge_parquet_files_by_datafusion( + let merge_result = merge_parquet_files_by_datafusion( stream_type, &stream_name, schema, tables, &bloom_filter_fields, &new_file_meta, - ).await; + ).await.with_context(|| format!( + "Failed to merge parquet files for stream: {}/{}", + stream_type, stream_name + ));
1147-1177: Consider optimizing term processing for better performance.The current implementation processes terms sequentially. Consider using parallel processing for large datasets:
- let terms = (0..num_rows) - .flat_map(|i| { - split_token(column_data.value(i), &cfg.common.inverted_index_split_chars) - .into_iter() - .map(|s| (s, i)) - .collect::<Vec<_>>() - }) - .collect::<Vec<_>>(); + let terms = (0..num_rows) + .into_par_iter() + .flat_map(|i| { + split_token(column_data.value(i), &cfg.common.inverted_index_split_chars) + .into_par_iter() + .map(|s| (s, i)) + .collect::<Vec<_>>() + }) + .collect::<Vec<_>>();This would require adding
rayonas a dependency for parallel iteration support.
1494-1506: Consider using a pre-allocated buffer for better memory efficiency.The current implementation creates new buffers for each column. Consider pre-allocating a reusable buffer:
+ let mut reusable_buf = Vec::with_capacity(1024 * 1024); // Pre-allocate 1MB for (column_name, indexer) in indexers { if indexer.is_empty() { continue; } - let mut buf = Vec::new(); + reusable_buf.clear(); // Reuse the buffer let _index_meta = indexer.write(&mut buf).context(format!( "Error constructing FST ColumnIndex for field {}", column_name ))?; original_size += buf.len(); - puffin_writer.add_blob(column_name, buf)?; + puffin_writer.add_blob(column_name, reusable_buf.clone())?; }src/handler/http/request/search/mod.rs (1)
150-169: Consider adding unit tests for the parameter setting logic.The new parameter handling logic would benefit from unit tests to verify:
- Parameters are not overwritten when already set
- Error handling works as expected
- Empty/None cases are handled correctly
Would you like me to help create unit tests for these scenarios?
src/config/src/config.rs (1)
1590-1590: Improve error message clarity.The error message could be more descriptive by including:
- The current invalid value
- The purpose of this configuration
Consider updating the error message to be more informative:
- "ZO_INVERTED_INDEX_STORE_FORMAT must be one of both, parquet." + "Invalid ZO_INVERTED_INDEX_STORE_FORMAT value. This setting controls how inverted indices are stored. Allowed values are: 'both' or 'parquet' (default)."src/config/src/meta/puffin/writer.rs (2)
171-174: Typographical error: variablefile_metdadatashould befile_metadataThere is a typo in the variable name
file_metdadata. Correcting it tofile_metadatawill improve code readability and prevent potential confusion.Apply this diff to correct the typo:
fn get_payload(&mut self) -> Result<Vec<u8>> { - let file_metdadata = PuffinMeta { + let file_metadata = PuffinMeta { blobs: mem::take(&mut self.blob_metadata), properties: mem::take(&mut self.file_properties), }; - serde_json::to_vec(&file_metdadata).context("Error serializing puffin metadata") + serde_json::to_vec(&file_metadata).context("Error serializing puffin metadata") }
171-174: Inconsistent naming of blob metadata fields across structsThere is inconsistency in the naming of blob metadata fields:
- In
PuffinBytesWriter, the field isblobs_metadata.- In
PuffinFooterWriter, it'sblob_metadata.- In
PuffinMeta, it'sblobs.For consistency and improved readability, consider standardizing the field names across these structs.
src/common/utils/stream.rs (1)
92-101: Optimize min and max computation using Arrow compute functionsManually iterating over each value to find min and max can be inefficient. Utilize Arrow's
compute::minandcompute::maxfunctions for better performance.Apply this diff to use compute functions:
+ use arrow::compute::{min as compute_min, max as compute_max}; // ... - for i in 0..num_row { - if min_col.is_valid(i) { - let val = min_col.value(i); - if val < min_val { - min_val = val; - } - } - if max_col.is_valid(i) { - let val = max_col.value(i); - if val > max_val { - max_val = val; - } - } - } + if let Some(batch_min) = compute_min(min_col)? { + if batch_min < min_val { + min_val = batch_min; + } + } + if let Some(batch_max) = compute_max(max_col)? { + if batch_max > max_val { + max_val = batch_max; + } + }src/config/src/meta/puffin/reader.rs (1)
222-227: Validate blob offsets without assuming continuity.In the
validate_payloadfunction, the code assumes that eachblob.offsetis equal to the cumulativeoffset, which may not hold if blobs are not stored sequentially without gaps. This could lead to false positives when validating blob offsets. Consider verifying that each blob's offset and length are within the valid range without assuming continuity.Consider modifying the validation logic as follows:
for blob in &puffin_metadata.blobs { - ensure!( - blob.offset == offset, - anyhow!("Blob payload offset mismatch") - ); - offset += blob.length; + ensure!( + blob.offset + blob.length <= self.head_magic_offset(), + anyhow!("Blob payload exceeds file boundaries") + ); }src/config/src/meta/search.rs (1)
Line range hint
711-743: Consider using thestrumcrate to deriveDisplayandFromStrimplementationsThe manual implementations of the
DisplayandFromStrtraits forSearchEventTypecan be simplified by leveraging thestrumcrate'sDisplayandEnumStringderives. This reduces boilerplate code and improves maintainability.Apply this diff to implement the change:
+use strum_macros::{Display, EnumString}; #[derive(Hash, Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "lowercase")] +#[derive(Display, EnumString)] +#[strum(serialize_all = "lowercase")] pub enum SearchEventType { UI, Dashboards, Reports, Alerts, - Values, + #[strum(serialize = "_values", serialize = "values")] Values, Other, RUM, - DerivedStream, + #[strum(serialize = "derived_stream", serialize = "derivedstream")] + DerivedStream, } -impl std::fmt::Display for SearchEventType { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - SearchEventType::UI => write!(f, "ui"), - SearchEventType::Dashboards => write!(f, "dashboards"), - SearchEventType::Reports => write!(f, "reports"), - SearchEventType::Alerts => write!(f, "alerts"), - SearchEventType::Values => write!(f, "_values"), - SearchEventType::Other => write!(f, "other"), - SearchEventType::RUM => write!(f, "rum"), - SearchEventType::DerivedStream => write!(f, "derived_stream"), - } - } -} - -impl FromStr for SearchEventType { - type Err = String; - fn from_str(s: &str) -> std::result::Result<Self, Self::Err> { - let s = s.to_lowercase(); - match s.as_str() { - "ui" => Ok(SearchEventType::UI), - "dashboards" => Ok(SearchEventType::Dashboards), - "reports" => Ok(SearchEventType::Reports), - "alerts" => Ok(SearchEventType::Alerts), - "values" | "_values" => Ok(SearchEventType::Values), - "other" => Ok(SearchEventType::Other), - "rum" => Ok(SearchEventType::RUM), - "derived_stream" | "derivedstream" => Ok(SearchEventType::DerivedStream), - _ => Err(format!("Invalid search event type: {s}")), - } - } -}src/infra/src/file_list/postgres.rs (2)
Line range hint
82-84: Implement or remove thebatch_add_with_idmethodThe
batch_add_with_idmethod is currently unimplemented. If this method is intended for future use, please implement it. Otherwise, consider removing it to maintain code clarity and avoid potential confusion.
1520-1520: Adjust log level for successful operationThe log message indicates a successful operation but uses
log::warn!. Consider changing it tolog::info!to reflect the successful nature of the action.Apply this diff to adjust the log level:
- log::warn!("[POSTGRES] create table index(file_list_stream_file_idx) successfully"); + log::info!("[POSTGRES] create table index(file_list_stream_file_idx) successfully");
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (30)
deploy/build/buildspec-tag-arm64-pi.yml(1 hunks)src/cli/basic/cli.rs(3 hunks)src/common/utils/stream.rs(6 hunks)src/config/src/config.rs(1 hunks)src/config/src/meta/inverted_index/mod.rs(0 hunks)src/config/src/meta/inverted_index/reader.rs(3 hunks)src/config/src/meta/inverted_index/search.rs(6 hunks)src/config/src/meta/inverted_index/writer.rs(3 hunks)src/config/src/meta/puffin/mod.rs(4 hunks)src/config/src/meta/puffin/reader.rs(4 hunks)src/config/src/meta/puffin/writer.rs(4 hunks)src/config/src/meta/search.rs(1 hunks)src/config/src/utils/inverted_index.rs(2 hunks)src/config/src/utils/parquet.rs(2 hunks)src/handler/http/request/search/mod.rs(1 hunks)src/infra/src/file_list/mysql.rs(1 hunks)src/infra/src/file_list/postgres.rs(1 hunks)src/infra/src/file_list/sqlite.rs(1 hunks)src/job/file_list.rs(1 hunks)src/job/files/idx.rs(4 hunks)src/job/files/parquet.rs(15 hunks)src/report_server/src/report.rs(1 hunks)src/service/alerts/alert.rs(2 hunks)src/service/compact/file_list.rs(1 hunks)src/service/compact/merge.rs(11 hunks)src/service/dashboards/reports.rs(1 hunks)src/service/db/file_list/remote.rs(1 hunks)src/service/search/cluster/flight.rs(1 hunks)src/service/search/grpc/storage.rs(6 hunks)src/service/search/sql.rs(1 hunks)
💤 Files with no reviewable changes (1)
- src/config/src/meta/inverted_index/mod.rs
✅ Files skipped from review due to trivial changes (6)
- src/infra/src/file_list/mysql.rs
- src/infra/src/file_list/sqlite.rs
- src/job/file_list.rs
- src/service/compact/file_list.rs
- src/service/db/file_list/remote.rs
- src/service/search/sql.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/service/search/cluster/flight.rs
🧰 Additional context used
📓 Path-based instructions (21)
src/cli/basic/cli.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/common/utils/stream.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/config.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/meta/inverted_index/reader.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/meta/inverted_index/search.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/meta/inverted_index/writer.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/meta/puffin/mod.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/meta/puffin/reader.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/meta/puffin/writer.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/meta/search.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/utils/inverted_index.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/utils/parquet.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/handler/http/request/search/mod.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/infra/src/file_list/postgres.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/job/files/idx.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/job/files/parquet.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/report_server/src/report.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/alerts/alert.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/compact/merge.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/dashboards/reports.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/search/grpc/storage.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
🔇 Additional comments (35)
deploy/build/buildspec-tag-arm64-pi.yml (1)
20-20: Verify the impact of removing AES CPU feature.
While removing the AES CPU feature might be necessary for compatibility with Pi devices, this could affect cryptographic performance. Please ensure this change has been tested on target Pi devices.
✅ Verification successful
Removing AES CPU feature is safe for ARM64 Pi builds
Based on the codebase analysis, removing the AES CPU feature from .cargo/config.toml is safe because:
- No direct cryptographic implementations using hardware AES acceleration were found in the Rust code
- The project's cryptographic operations are handled through high-level dependencies (like
argon2,jsonwebtoken) which automatically fall back to software implementations when hardware features are unavailable - This change only affects the ARM64 Pi build target, maintaining compatibility with devices that may not have AES hardware acceleration
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any cryptographic implementations that might be affected
# by the removal of AES CPU feature
# Look for potential crypto/AES usage in the codebase
rg -l 'aes|crypto|cipher' --type rust
# Check if there are any relevant configuration or feature flags in Cargo.toml files
fd Cargo.toml --exec grep -l 'aes\|crypto\|cipher' {}
Length of output: 97
Script:
#!/bin/bash
# Let's try a broader search to understand the context better
# 1. First, let's see what's in the .cargo/config.toml to understand the AES config
fd -g '.cargo/config.toml' --exec cat {}
# 2. Check for any dependencies in Cargo.toml that might use crypto features
fd Cargo.toml --exec cat {}
# 3. Look for any encryption-related code with broader terms
rg -l 'encrypt|decrypt|cipher|hash|crypto' --type-not lock
Length of output: 19705
src/config/src/meta/inverted_index/search.rs (4)
9-9: LGTM: Import change aligns with refactoring.
The import change correctly reflects the transition to using FieldReader.
89-96: LGTM: Consistent implementation with other search changes.
The changes align with the overall refactoring pattern.
#!/bin/bash
# Verify that all calls to ExactSearch::search have been updated
rg -p "ExactSearch.*?search" --type rust54-61: LGTM: Consistent implementation with SubstringSearch changes.
The changes maintain functional parity while simplifying the interface.
#!/bin/bash
# Verify that all calls to PrefixSearch::search have been updated
rg -p "PrefixSearch.*?search" --type rust26-33: LGTM: Simplified method signature improves code clarity.
The transition to FieldReader removes unnecessary generic constraints while maintaining functionality. This change supports the PR's goal of implementing a streaming approach.
✅ Verification successful
LGTM: Verified that all SubstringSearch usage is compatible with the changes
The search results confirm that:
- The only caller of
SubstringSearch::searchis insrc/service/search/grpc/storage.rsand it's already usingFieldReader - The implementation in
src/config/src/meta/inverted_index/search.rsis consistent across all search types (ExactSearch, PrefixSearch, SubstringSearch) - No other usages of SubstringSearch::search were found that might be incompatible with the changes
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Verify that all calls to SubstringSearch::search have been updated
rg -p "SubstringSearch.*?search" --type rust
Length of output: 194
Script:
#!/bin/bash
# Let's try a broader search to find all usages of search method in the codebase
rg -p "search.*?FieldReader" --type rust -A 3 -B 3
# Also search for SubstringSearch struct definition and implementations
ast-grep --pattern 'impl $_ for SubstringSearch'
# And direct usage of SubstringSearch
rg "SubstringSearch" --type rust -A 3 -B 3
Length of output: 2762
src/config/src/meta/inverted_index/writer.rs (2)
78-79: LGTM! Efficient ownership transfer using std::mem::take.
Good optimization using take to avoid unnecessary cloning while iterating over the sorter.
88-90: Verify the necessity of resetting index_size to 0.
The index_size is reset to 0 before calculating offsets. This seems suspicious as it might affect the relative offset calculations. Please verify if this is intentional or if it should maintain its previous value.
✅ Verification successful
Let me gather more context about the usage of index_size in the writer implementation.
Let me try a different approach to get the context around the writer implementation.
Resetting index_size to 0 is correct and necessary
The index_size reset is intentional and correct. Here's why:
- The
index_sizeis used during the bitmap writing phase inappend_value()to track offsets for each bitmap (lines 117-124) - After all bitmaps are written and before FST is written, resetting to 0 is correct because:
- The bitmap writing phase is complete (all offsets are stored in FST)
- The FST and metadata are written after this point
- The stored offsets in FST remain valid as they were calculated during bitmap writing
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other usages of index_size to understand the impact
rg -p "index_size" --type rust
Length of output: 362
Script:
#!/bin/bash
# Get the full implementation context of the writer
ast-grep --pattern 'impl$_ InvertedIndexWriter {
$$$
}'
Length of output: 59
Script:
#!/bin/bash
# Get more context around the index_size usage in writer.rs
rg -p -B 10 -A 10 "index_size" src/config/src/meta/inverted_index/writer.rs
Length of output: 1862
src/config/src/utils/parquet.rs (1)
27-30: LGTM: Import changes align with new streaming functionality
The new imports support the transition to streaming parquet data processing, which is essential for reducing memory usage.
src/config/src/utils/inverted_index.rs (3)
69-71: Memory optimization looks good
The simplified implementation aligns well with the PR's objective of reducing memory usage during compaction. Direct buffer usage without intermediate steps is more efficient.
69-71: Verify if buffer validation is handled elsewhere
The previous implementation likely included validation of the buffer contents. Please confirm that this validation is now handled either:
- Inside
IndexReader::new - At a higher level before calling this function
#!/bin/bash
# Look for validation in IndexReader implementation
ast-grep --pattern 'impl IndexReader {
$$$
fn new($$$) {
$$$
}
$$$
}'18-25: Verify error handling after removing anyhow macros
The removal of anyhow macros while keeping anyhow::Result suggests a simplified error handling approach. Please ensure that all error cases are still properly handled and propagated.
✅ Verification successful
Error handling is properly maintained after macro removal
The code shows a well-maintained error handling approach:
- The
create_index_reader_from_puffin_bytesfunction still returnsResult<IndexReader<...>>and handles errors appropriately - The function has been simplified to a direct constructor call that propagates errors through
Ok() - No complex error handling was needed after the simplification, making the
anyhowmacros removal justified - Other functions in the file use proper error handling through Option types and explicit returns
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining anyhow error handling patterns
rg -A 2 'anyhow!|ensure!' src/config/src/utils/
Length of output: 83
Script:
#!/bin/bash
# Let's check the error handling patterns in the inverted_index.rs file
rg -A 5 'Result<' src/config/src/utils/inverted_index.rs
# Also check for any error propagation using '?'
rg -A 2 '\?' src/config/src/utils/inverted_index.rs
# Look for the implementation of create_index_reader_from_puffin_bytes
ast-grep --pattern 'fn create_index_reader_from_puffin_bytes($$$) -> Result<$$$> {
$$$
}'
Length of output: 709
Script:
#!/bin/bash
# Let's look at the complete implementation of the file to understand the error handling
cat src/config/src/utils/inverted_index.rs
# Also check for any other files that might be using this module to ensure error handling is consistent
rg -l "use.*inverted_index" src/config/src/
Length of output: 10475
src/cli/basic/cli.rs (2)
135-135: LGTM! Clear and consistent message format.
The success message follows good CLI output conventions and clearly indicates the operation result.
266-266: LGTM! Well-structured success message with proper error handling.
The success message is consistent with the CLI's message format and is paired with appropriate error handling.
src/job/files/idx.rs (3)
27-29: LGTM: Import changes align with streaming parquet implementation.
The new imports support the refactored parquet writing functionality and enhanced record batch operations.
74-74: LGTM: Improved logging clarity with specific context.
The "[JOB:IDX]" prefix helps in better identifying index-related operations in logs.
Also applies to: 126-129, 133-133
100-105: Consider adding error handling for populate_file_meta.
While the FileMeta initialization is improved, the populate_file_meta call could benefit from more robust error handling. Consider handling the case where "min_ts" or "max_ts" columns don't exist.
- populate_file_meta(&[&batch], &mut file_meta, Some("min_ts"), Some("max_ts")).await?;
+ if let Err(e) = populate_file_meta(&[&batch], &mut file_meta, Some("min_ts"), Some("max_ts")).await {
+ log::warn!("[JOB:IDX] Failed to populate timestamp metadata: {}", e);
+ // Continue without timestamp metadata
+ }src/report_server/src/report.rs (1)
391-391: LGTM: Simplified string conversion
The change from format! to to_string() is more idiomatic and efficient for simple string conversion.
src/service/search/grpc/storage.rs (2)
138-138: LGTM: Improved log message formatting
The addition of a space before "ms" enhances log readability.
554-563: LGTM: Appropriate use of debug logging
The debug log messages provide useful information for troubleshooting while maintaining appropriate log levels.
src/service/alerts/alert.rs (2)
Line range hint 358-364: LGTM! Documentation improvement
The updated documentation clearly describes the return type and its components.
559-559: LGTM! Code simplification
The simplified email subject assignment is more idiomatic and maintains the same functionality.
src/service/compact/merge.rs (1)
1131-1193: LGTM with existing review comments.
The implementation looks good, noting that there's an existing review comment about handling semaphore permit acquisition errors.
src/job/files/parquet.rs (2)
792-799: LGTM! Well-structured index generation functions.
The index generation functions are well-organized with consistent parameter handling and proper error management.
Also applies to: 964-971
Line range hint 1-1517: Overall implementation looks solid and achieves the PR objectives.
The changes successfully implement a streaming approach for writing parquet files, which significantly reduces memory usage during compaction. The code is well-structured with proper error handling and logging. The suggested optimizations can be considered for future improvements.
src/handler/http/request/search/mod.rs (2)
150-161: LGTM: Improved error handling and conditional setting of search parameters.
The changes enhance robustness by:
- Only setting values if they are None
- Using proper error handling with match expressions
- Preserving existing values when present
164-169: LGTM: Consistent error handling for index_type.
The implementation follows the same robust pattern of conditional setting and error handling used for other parameters.
src/config/src/meta/puffin/mod.rs (5)
36-36: Addition of COMPRESSED flag is appropriate.
Defining the COMPRESSED flag in PuffinFooterFlags correctly expands the available footer flags and is implemented properly.
44-44: Renaming blob_metadata to blobs enhances clarity.
Updating the field name to blobs improves readability and aligns with common naming conventions.
58-58: Use of BlobTypes enum for blob_type improves type safety.
Changing blob_type to the BlobTypes enum enhances type safety and ensures only valid blob types are used.
94-103: Introduction of BlobTypes enum enhances serialization and maintainability.
Defining BlobTypes with appropriate variants and serde attributes ensures correct serialization/deserialization and improves code maintainability.
62-76: Ensure changing integer types to unsigned doesn't introduce issues.
Switching fields, snapshot_id, sequence_number, offset, and length to unsigned integers (u32, u64) prevents negative values, which is appropriate if these values should always be non-negative. Please verify this change doesn't affect existing data or logic that might have used negative values.
Run the following script to check for any negative assignments or comparisons:
✅ Verification successful
Based on the comprehensive search results, I can now provide a final response:
The change to unsigned integers is appropriate and safe.
The codebase analysis shows that:
- No negative assignments or comparisons were found for these fields
- The fields are used in ways consistent with unsigned values:
offsetis used for file positions and timestampslengthrepresents sizes of data blocksfields,snapshot_id, andsequence_numberare used as counters/identifiers
- The arithmetic operations found (
offset + length,offset + hour_micros(1), etc.) are all additive or multiplicative, with no need for negative values
The change aligns with the semantic meaning of these fields in the Puffin format, where they represent:
- File offsets (must be non-negative)
- Data lengths (must be non-negative)
- Sequence numbers (naturally non-negative)
- Field counts (naturally non-negative)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Search for negative assignments or comparisons involving the affected fields.
# Search for negative assignments
rg --type rust --pcre2 '(fields|snapshot_id|sequence_number|offset|length)\s*=\s*-\d+'
# Search for negative comparisons
rg --type rust --pcre2 '(fields|snapshot_id|sequence_number|offset|length)\s*[<>=!]=\s*-'
Length of output: 180
Script:
#!/bin/bash
# Let's perform a more comprehensive analysis:
# 1. Find all usages of these fields in the codebase
rg -A 2 -B 2 '\b(fields|snapshot_id|sequence_number|offset|length)\b' --type rust
# 2. Look for type declarations or casts involving these fields
ast-grep --pattern 'let $name: i32 = $value' --lang rust
ast-grep --pattern 'let $name: i64 = $value' --lang rust
# 3. Look for arithmetic operations that might assume signed integers
rg '\b(fields|snapshot_id|sequence_number|offset|length)\s*[-+*/]\s*' --type rust
# 4. Check for any test files that might reveal expected behavior
fd -e rs test -e spec -e tests --exec grep -l 'fields\|snapshot_id\|sequence_number\|offset\|length' {}
Length of output: 207545
src/config/src/meta/puffin/writer.rs (1)
161-161: Verify that PuffinFooterFlags::DEFAULT accurately reflects compression settings
The footer flags are set to PuffinFooterFlags::DEFAULT:
buf.extend_from_slice(&PuffinFooterFlags::DEFAULT.bits().to_le_bytes());Please verify that this correctly represents the compression state of the blobs. If any blobs are compressed, the footer flags may need to be updated accordingly.
src/common/utils/stream.rs (2)
Line range hint 166-183: Test test_populate_file_meta is correctly implemented
The test accurately verifies populate_file_meta with default field names, ensuring proper functionality.
Line range hint 202-221: Test test_populate_file_meta_with_custom_field looks good
The test effectively checks populate_file_meta with custom min and max field names, confirming expected behavior.
src/config/src/meta/puffin/reader.rs (1)
83-86: Ensure callers handle the updated return type of get_metadata.
The return type of get_metadata has changed from Result<PuffinMeta> to Result<Option<PuffinMeta>>. This means that callers need to handle the Option case appropriately. Please verify that all usages of this method have been updated to accommodate the new return type to prevent potential None value handling issues.
Run the following script to find all usages of get_metadata:
Use streaming to write parquet files instead of obtaining the entire RecordBatch and writing to parquet, significantly reducing memory usage on the Compactor.
For example we set
ZO_COMPACT_MAX_FILE_SIZE = 10240, The versionv0.13.0merge file need10GBmemory, But it only need500MBwith this PR.Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Documentation
Chores