Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

@hengfeiyang hengfeiyang commented Jun 14, 2024

New environments

  • ZO_COMPACT_FAST_MODE=true, Enable fast mode compact, will use more memory but faster
  • ZO_FILE_MOVE_THREAD_NUM=0, threads num for file move on ingester
  • ZO_MEM_DUMP_THREAD_NUM=0, threads num for memtable dump to disk on ingester
  • ZO_FILE_MERGE_THREAD_NUM=0, threads num for file merge on compactor

Update

  • Optimize file_list index
  • Optimize MemTable dump to disk, use worker and channels
  • Optimize compactor merging to reduce memory usage

@hengfeiyang hengfeiyang requested a review from oasisk June 14, 2024 10:46
@github-actions github-actions bot added ☢️ Bug Something isn't working ✏️ Feature labels Jun 14, 2024
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 14, 2024

Walkthrough

The changes encompass configuration updates, SQL query adjustments, improvements in file handling, and refined control flow in data ingestion and processing. Key modifications include new attributes in configuration structs, reordering SQL parameters, enhanced logging, and a multi-threaded approach for data persistence. These updates streamline performance, enhance configuration flexibility, and improve error handling and logging.

Changes

Files/Groups Change Summary
src/config/src/config.rs Reordered attributes in the Common struct and added new ones in the Limit struct. Added fast_mode in the Compact struct. Updated defaults in init.
src/config/src/utils/record_batch_ext.rs, src/handler/http/request/... Replaced less_memory_mode with fast_mode in concat_batches. Conditional formatting of stream_name in stream/mod.rs function.
src/infra/src/file_list/mysql.rs, src/infra/src/file_list/postgres.rs, src/infra/src/file_list/sqlite.rs Adjusted SQL queries and index definitions to swap parameter order and columns.
src/ingester/src/errors.rs Added a new error variant TokioMpscSendError to the Error enum.
src/ingester/src/immutable.rs Added PROCESSING_TABLES variable, updated persist function with tx parameter and added persist_table function.
src/ingester/src/lib.rs Major control flow changes, introduced run() function for multi-threaded data persistence, enhanced task management using Tokio.
src/ingester/src/partition.rs Modified schema fields length check condition from > to >=.
src/ingester/src/writer.rs Updated log context from WAL to MEM.
src/job/compactor.rs Changed thread limit parameter from file_move_thread_num to file_merge_thread_num in run function.
src/job/files/parquet.rs Removed write_recordbatch_to_parquet, consolidated merge_parquet_files, updated error logging and refactored merge_files function.
src/job/flatten_compactor.rs, src/service/compact/file_list.rs, src/service/compact/flatten.rs, src/service/compact/merge.rs, src/service/compact/mod.rs Updated thread management by replacing references to file_move_thread_num with file_merge_thread_num.

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Outside diff range and nitpick comments (4)
src/job/files/parquet.rs (3)

35-35: Consider adding documentation for the merge_parquet_files function to clarify its purpose and usage.


321-321: Consider adding a comment here to explain why the condition checks the number of fields against file_move_fields_limit.


582-582: The branching logic here is clear and well-implemented. Consider adding a comment explaining why different strategies are used for different stream types.

src/config/src/config.rs (1)

867-871: Validate the default value for fast_mode in the Compact struct.

The default value for fast_mode is set to true, which implies a significant memory usage increase. It's crucial to ensure that this default setting aligns with typical use cases and system capabilities. Consider adding a warning or note in the documentation about the potential high memory usage.

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between feb5d1d and 9287667.

Files selected for processing (18)
  • src/config/src/config.rs (5 hunks)
  • src/config/src/utils/record_batch_ext.rs (2 hunks)
  • src/handler/http/request/stream/mod.rs (1 hunks)
  • src/infra/src/file_list/mysql.rs (2 hunks)
  • src/infra/src/file_list/postgres.rs (2 hunks)
  • src/infra/src/file_list/sqlite.rs (2 hunks)
  • src/ingester/src/errors.rs (1 hunks)
  • src/ingester/src/immutable.rs (4 hunks)
  • src/ingester/src/lib.rs (3 hunks)
  • src/ingester/src/partition.rs (1 hunks)
  • src/ingester/src/writer.rs (3 hunks)
  • src/job/compactor.rs (1 hunks)
  • src/job/files/parquet.rs (9 hunks)
  • src/job/flatten_compactor.rs (1 hunks)
  • src/service/compact/file_list.rs (1 hunks)
  • src/service/compact/flatten.rs (1 hunks)
  • src/service/compact/merge.rs (4 hunks)
  • src/service/compact/mod.rs (1 hunks)
Additional context used
Path-based instructions (18)
src/job/flatten_compactor.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/ingester/src/errors.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/ingester/src/lib.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/job/compactor.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/ingester/src/immutable.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/ingester/src/partition.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/flatten.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/ingester/src/writer.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/handler/http/request/stream/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/file_list.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/utils/record_batch_ext.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/file_list/postgres.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/file_list/mysql.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/file_list/sqlite.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/job/files/parquet.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/merge.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/config.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

Additional comments not posted (26)
src/job/flatten_compactor.rs (1)

40-40: Updated thread handling to use file_merge_thread_num aligns with the global configuration change. This ensures that the number of threads used for merging is consistent across the application.

Also applies to: 43-43

src/ingester/src/errors.rs (1)

103-105: The addition of TokioMpscSendError is appropriate for handling errors that may occur during asynchronous message passing operations. This enhances error handling capabilities within the application.

src/ingester/src/lib.rs (1)

26-36: The restructuring and introduction of new asynchronous functions to handle data persistence are well-implemented. Using mpsc::channel for thread communication and managing data persistence tasks asynchronously are modern Rust practices that enhance the application's performance and maintainability.

Also applies to: 65-71, 74-115

src/job/compactor.rs (1)

43-43: The updates to thread handling using file_merge_thread_num are consistent with changes across other modules, ensuring uniformity in configuration usage. The asynchronous spawn of tasks for merging operations is correctly implemented, promoting efficient resource utilization and scalability.

Also applies to: 46-46

src/ingester/src/immutable.rs (3)

Line range hint 108-132: The new persist function introduces asynchronous handling of paths. Ensure that the error handling is robust, especially with potential SendError from the tx.send call.

Verification successful

The persist function handles potential SendError from the tx.send call by using the context(TokioMpscSendSnafu) method, which ensures robust error handling.

  • src/ingester/src/immutable.rs: tx.send(path.clone()).await.context(TokioMpscSendSnafu)?;
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify error handling paths in the new `persist` function.
ast-grep --lang rust --pattern $'fn persist($_, $_) { $$$ }'

Length of output: 61



Script:

#!/bin/bash
# Description: Verify error handling for the `tx.send` call within the `persist` function.
rg 'tx\.send'

Length of output: 1600


19-25: Ensure appropriate use of added imports and check for any unused ones to maintain clean and efficient code.

Verification successful

The script output indicates that the imports in the src/ingester/src/immutable.rs file are indeed used within the codebase. This confirms that the added imports are necessary and not redundant.

  • hashbrown::HashSet is used.
  • once_cell::sync::Lazy is used.
  • snafu::ResultExt is used.
  • tokio::fs and tokio::sync::{mpsc, RwLock} are used.

Thus, there are no unused imports in the specified lines of src/ingester/src/immutable.rs.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Check for unused imports in the file.
ast-grep --lang rust --pattern $'use $_;'

Length of output: 291690


134-179: The persist_table function introduces detailed logging and error handling. Verify that the logging levels are appropriately set and that performance metrics are updated correctly after operations.

Verification successful

The persist_table function in src/ingester/src/immutable.rs correctly includes detailed logging statements and updates performance metrics after operations.

  • Logging levels are set to info for both the start and finish of the persistence process.
  • Performance metrics are updated correctly with metrics::INGEST_MEMTABLE_BYTES, metrics::INGEST_MEMTABLE_ARROW_BYTES, and metrics::INGEST_MEMTABLE_FILES.

Locations in the codebase:

  • Logging statements: src/ingester/src/immutable.rs lines related to log::info!
  • Performance metrics updates: src/ingester/src/immutable.rs lines related to metrics::INGEST_MEMTABLE_BYTES
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify logging and performance metrics in `persist_table`.
ast-grep --lang rust --pattern $'fn persist_table($_, $_) { $$$ }'

Length of output: 67



Script:

#!/bin/bash
# Search for the `persist_table` function definition and its content
rg 'async fn persist_table' -A 30

# Check for logging statements within the `persist_table` function
rg 'log::info!' -A 5

# Verify performance metrics update within the `persist_table` function
rg 'metrics::INGEST_MEMTABLE_BYTES' -A 5

Length of output: 115491

src/ingester/src/partition.rs (1)

111-111: The condition change in the partition's persistence logic needs to be carefully tested to ensure it does not introduce any regressions or unexpected behaviors.

src/service/compact/flatten.rs (1)

47-47: The update to utilize file_merge_thread_num for semaphore initialization aligns with the configuration changes. Ensure that this value is appropriately tuned for system performance.

src/ingester/src/writer.rs (1)

145-145: The updated logging statements provide clearer information on file operations. Ensure that these logs are consistent and provide sufficient detail for debugging.

Also applies to: 210-210, 236-238

src/handler/http/request/stream/mod.rs (1)

99-102: The conditional formatting of stream_name based on the configuration flag skip_formatting_bulk_stream_name is a good approach for flexibility. However, ensure that this configuration flag is properly documented and tested to avoid confusion about its usage and effects.

src/service/compact/file_list.rs (1)

227-227: The semaphore is initialized with the number of threads specified by file_merge_thread_num from the configuration. This is a good use of resources, ensuring that the number of concurrent tasks is controlled. Make sure to handle potential errors when acquiring a semaphore permit to prevent deadlocks or resource starvation.

src/service/compact/mod.rs (1)

187-187: Initializing the semaphore with file_merge_thread_num for controlling concurrency in the run_merge function is appropriate. This ensures that the system's performance is optimized by not overloading it with too many simultaneous merge operations. It's crucial to monitor the system's performance to ensure that this setting does not become a bottleneck.
[APROVED]

src/config/src/utils/record_batch_ext.rs (1)

Line range hint 487-510: The change from less_memory_mode to fast_mode in concat_batches function adjusts the logic to potentially prioritize performance over memory usage. Ensure that all upstream calls to this function have been updated to use the new parameter and understand the implications of this change.

Verification successful

Let's proceed by checking the specific lines where concat_batches is called to ensure they use the new fast_mode parameter.


Let's correct the regex pattern to properly match the function calls and re-run the script.


The updated script successfully extracted the relevant lines, allowing us to verify the parameters used in the concat_batches function calls. Here's what we found:

  • In src/service/compact/merge.rs, the concat_batches function is called with schema.clone(), record_batches, and get_config().compact.fast_mode, which corresponds to the new fast_mode parameter.

  • Other instances in src/service/compact/flatten.rs, src/handler/grpc/request/event.rs, and src/job/files/parquet.rs use arrow::compute::concat_batches, which seems to be a different function and does not involve the fast_mode parameter.

The review comment's concern about updating the parameter appears to have been addressed correctly in the relevant parts of the codebase.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify all function calls to `concat_batches` match the new signature.

# Test: Search for the function usage. Expect: Only occurrences of the new signature.
rg --type rust $'concat_batches'

Length of output: 589



Script:

#!/bin/bash
# Extract lines where concat_batches is called and display surrounding context to verify parameters used.

# Extract lines and context
rg --type rust -A 3 -B 3 $'concat_batches('

Length of output: 125



Script:

#!/bin/bash
# Extract lines where concat_batches is called and display surrounding context to verify parameters used.

# Extract lines and context
rg --type rust -A 3 -B 3 'concat_batches\('

Length of output: 2462

src/infra/src/file_list/postgres.rs (3)

309-314: The reordering of conditions in the SQL query for better performance is a good practice, especially if max_ts is more selective than min_ts.


803-803: Updating the index to prioritize max_ts over min_ts can improve the efficiency of queries that filter based on these columns. Ensure that the data distribution justifies this change.


811-811: Similar to the previous comment, this change in the index order should be validated against actual data usage patterns to ensure it benefits query performance.

src/infra/src/file_list/mysql.rs (3)

306-311: The SQL query conditions have been reordered to optimize the search based on time ranges. This change should enhance the performance of queries by leveraging the new index structure.


798-798: The updated index order should optimize the database's performance for queries involving time ranges. Ensure that all related queries use this index effectively.


806-806: Similar to the previous comment, updating the index order in the file_list_history table is a good optimization step. Make sure the application's logic aligns with these changes to fully benefit from the index restructuring.

src/infra/src/file_list/sqlite.rs (2)

316-321: The updated SQL query condition is now consistent with the reordered parameters for time range selection.


817-817: The index creation statements have been updated to reflect the new order of columns (max_ts, min_ts) which should optimize query performance based on the new query conditions.

Also applies to: 825-825

src/job/files/parquet.rs (1)

85-85: The error logging here is detailed and should be very helpful for debugging issues with moving parquet files.

src/service/compact/merge.rs (3)

304-304: Use of semaphore for concurrency control aligns with the updated configuration. This ensures that the number of concurrent merge operations is controlled by file_merge_thread_num.


688-688: The consolidation of merge_parquet_files functions under one unified function simplifies the codebase and reduces potential errors from handling multiple versions. Ensure that all scenarios previously covered by the versioned functions are still adequately handled.


1076-1080: The use of fast_mode in concat_batches is a significant alteration that optimizes performance based on configuration. Confirm that this change aligns with the intended use cases and performance benchmarks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

☢️ Bug Something isn't working ✏️ Feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants