-
Notifications
You must be signed in to change notification settings - Fork 715
refactor: compactor working with job queue #3761
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis update enhances job management and processing capabilities within the codebase. Key changes include adding new fields to configuration structs, updating error messages, and introducing functions for precise time measurement. Significant improvements in file and job handling, including new methods for managing merge jobs, a refined merging process, and updated logging, streamline operations and ensure more robust error handling. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Config
participant Compactor
participant JobManager
participant Logger
User->>Compactor: Initiate Job
Compactor->>Config: Load Config
Config-->>Compactor: Send Config Values
Compactor->>JobManager: Generate Job
JobManager->>JobManager: Add Job to Queue
JobManager->>Compactor: Send job ID and offsets
Compactor->>Logger: Log Job Details
JobManager->>JobManager: Check Running Jobs
JobManager->>Logger: Log Job Status
Note right of Logger: Enhanced logging details
Logger-->>Config: Log configuration changes
Logger-->>Compactor: Log Compactor activities
sequenceDiagram
participant System
participant Timestamp
participant FileList
participant Job
participant StreamType
System->>Timestamp: Call now_micros()
Timestamp-->>System: Current Timestamp in Microseconds
System->>FileList: Manage Merge Jobs
FileList->>Job: Create/Update Job Records
FileList->>StreamType: Get Stream Type
StreamType-->>FileList: Stream Type Data
FileList->>System: Job Details for Merge
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 17
Outside diff range and nitpick comments (12)
src/service/compact/mod.rs (1)
Line range hint
170-249: The functionrun_generate_jobshould handle exceptions more robustly, especially considering it interacts with multiple external systems.src/infra/src/file_list/mysql.rs (4)
714-722: Review the necessity of updatingupdated_atwithout other changes.The method
update_running_jobsonly updates theupdated_atfield. If this is intended to refresh the job status implicitly, consider adding a comment explaining this. Otherwise, review if this update is necessary as it may lead to unnecessary write loads on the database.
724-739: Improve logging for reset operations.The method
check_running_jobslogs a warning when resetting jobs, which might be confused with an error. Consider changing this to an informational log unless the reset indicates an abnormal situation.if ret.rows_affected() > 0 { log::info!("[MYSQL] reset running jobs status to pending"); }
919-934: Ensure the new table structure supports future scalability.The creation of
file_list_jobswith fields likeoffsetsandstatusis crucial for job management. Consider indexing more columns if query performance becomes an issue as the data grows, especially on fields frequently used in WHERE clauses.
994-1001: Review the necessity of multiple indexes onfile_list_jobs.Multiple indexes have been created on
file_list_jobs. While indexes improve query performance, they also increase the storage and maintenance overhead. Review if all these indexes are necessary or if some can be combined for efficiency.src/infra/src/file_list/postgres.rs (3)
741-756: Add logging for successful job resets incheck_running_jobs.It's good to have logging for when jobs are reset to pending, but it might also be useful to log when the operation completes successfully without any resets. This could help in debugging and understanding the system's behavior under normal conditions.
758-769: Consider adding metrics or monitoring for theclean_done_jobsmethod.The method
clean_done_jobsis crucial for maintaining the health of the job queue. Consider adding metrics or monitoring around this method to track how often it's called and its performance, which could help in proactive system maintenance.
1013-1020: Validate the necessity of unique and status indexes onfile_list_jobs.The unique index on
streamandoffsetsand the status index might be crucial for performance, but it's also important to validate their necessity to avoid over-indexing which can slow down write operations.src/service/compact/merge.rs (3)
Line range hint
80-195: Consider handling potential errors more gracefully ingenerate_job_by_stream.The function
generate_job_by_streamreturns early in multiple places without logging the reason for these early returns, which could make debugging difficult. Consider adding logging before each return statement to provide more context on why a particular path was taken.
Line range hint
76-195: Refactor thegenerate_job_by_streamfunction to improve readability.The function
generate_job_by_streamis quite long and handles multiple levels of logic. Consider breaking this function into smaller, more focused functions, each handling a specific part of the job generation process. This would improve readability and maintainability.
Line range hint
207-508: Consider modularizing themerge_by_streamfunction.The
merge_by_streamfunction is extensive and handles many operations from acquiring locks to updating the database. Splitting this function into smaller, more manageable parts could enhance clarity and make the code easier to maintain.web/src/plugins/logs/SearchBar.vue (1)
Line range hint
918-930: Validate input before assignment to avoid potential issues.The refactoring of the
downloadRangeDatamethod to include input validation is a positive change, ensuring that the initial number is a positive number before proceeding with the download logic. However, it's crucial to perform this validation before assigning the value tothis.searchObj.data.customDownloadQueryObj.query.from. This ensures that only valid data affects the application state, enhancing both security and reliability.- 930~ this.searchObj.data.customDownloadQueryObj.query.from = initNumber; + 930~ // This line has been moved above to ensure validation before assignment
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (11)
- src/config/src/config.rs (2 hunks)
- src/config/src/meta/sql.rs (1 hunks)
- src/config/src/utils/time.rs (1 hunks)
- src/infra/src/file_list/mod.rs (4 hunks)
- src/infra/src/file_list/mysql.rs (3 hunks)
- src/infra/src/file_list/postgres.rs (4 hunks)
- src/infra/src/file_list/sqlite.rs (3 hunks)
- src/job/compactor.rs (4 hunks)
- src/service/compact/merge.rs (9 hunks)
- src/service/compact/mod.rs (4 hunks)
- web/src/plugins/logs/SearchBar.vue (2 hunks)
Additional context used
Path-based instructions (10)
src/job/compactor.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/infra/src/file_list/mod.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/utils/time.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/compact/mod.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/infra/src/file_list/mysql.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/infra/src/file_list/postgres.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/infra/src/file_list/sqlite.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/meta/sql.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/compact/merge.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/config.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
Additional comments not posted (14)
src/infra/src/file_list/mod.rs (1)
118-129: Ensure that the new methods related to merge job management (add_job,get_pending_jobs,set_job_done,update_running_jobs,check_running_jobs,clean_done_jobs) handle exceptions and errors appropriately, especially in asynchronous contexts.src/config/src/utils/time.rs (1)
37-40: The functionnow_microsis a useful addition for high-precision time requirements. Ensure that it is used consistently across the project where such precision is necessary.src/infra/src/file_list/mysql.rs (2)
626-701: Optimize transaction handling for better performance.
[REFACTOR_SUGGESTion]
This method involves multiple database operations within a transaction, which could be optimized. Consider consolidating the database calls or using batch operations where possible to reduce the overhead of transaction management and improve performance.
741-751: Validate the logic for cleaning up jobs.The
clean_done_jobsmethod deletes jobs based on theupdated_attimestamp and job status. Verify that this logic correctly identifies only the intended jobs to prevent accidental deletion of active jobs.Verification successful
The gathered context provides a clear view of how the
clean_done_jobsfunction is utilized. Therun_clean_done_jobsfunction insrc/job/compactor.rsruns in a loop, periodically invokingclean_done_jobswith a calculatedupdated_attimestamp, ensuring that only jobs older than a certain threshold are targeted. This logic appears sound and should prevent the accidental deletion of active jobs, as it relies on theupdated_attimestamp and the job status.
src/job/compactor.rs:
run_clean_done_jobscalculates anupdated_attimestamp based on a configured wait time.- It calls
clean_done_jobswith this timestamp in a loop, ensuring periodic cleanup.
src/infra/src/file_list/mod.rs:
- The
clean_done_jobsfunction is defined and called through theCLIENTinterface.The logic for cleaning up jobs based on the
updated_attimestamp and job status appears to be correctly implemented.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check for any active jobs that might be incorrectly marked for deletion. rg --type rust "clean_done_jobs"Length of output: 887
Script:
#!/bin/bash # Extract the implementation of run_clean_done_jobs from src/job/compactor.rs ast-grep --lang rust --pattern 'async fn run_clean_done_jobs() -> Result<$_, $_> { $$$ }' # Extract the relevant parts of src/infra/src/file_list/mod.rs where clean_done_jobs is defined and called rg -A 10 'async fn clean_done_jobs' src/infra/src/file_list/mod.rsLength of output: 1526
src/infra/src/file_list/postgres.rs (4)
720-728: Optimize theset_job_donemethod by caching the database connection.
[PERFORMANCE]
The methodset_job_doneretrieves a new database connection on every call. This could be optimized by reusing a connection or using a connection pool. Consider modifying the method to accept a database connection as a parameter or use a global/static connection pool.
731-738: Review the necessity of frequent time updates inupdate_running_jobs.
[PERFORMANCE]
The methodupdate_running_jobsupdates theupdated_attimestamp every time it's called. If this method is called frequently, it could lead to performance issues due to the constant updates. Consider if there's a way to reduce the frequency of these updates or batch them.
630-718: Ensure proper transaction handling inget_pending_jobs.The method
get_pending_jobsuses a transaction which is good for consistency. However, the rollback scenarios are scattered and might be missed in future modifications. Consider using a transaction management pattern or a context manager that ensures the rollback in all error scenarios.Verification successful
The extracted code for the
get_pending_jobsfunction reveals that transaction rollbacks are indeed handled in various error scenarios. EachErr(e)block includes atx.rollback().awaitcall, ensuring that transactions are rolled back upon encountering errors. Here are the key points:
- Rollback is handled after failing to acquire the advisory lock.
- Rollback is handled after failing to fetch pending jobs.
- Rollback is handled if there are no pending jobs to update.
- Rollback is handled after failing to update job statuses.
- Rollback is handled after failing to fetch jobs by IDs.
Given this thorough handling of transaction rollbacks, the initial review comment's concern about scattered rollbacks is mitigated.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Verify that all error paths in get_pending_jobs handle transactions correctly rg --type rust "tx.rollback().await" "src/infra/src/file_list/postgres.rs"Length of output: 74
Script:
#!/bin/bash # Extract the `get_pending_jobs` function from `postgres.rs` for manual review ast-grep --lang rust --pattern $'impl $_ { $$$ async fn get_pending_jobs(&self, $_, $_) -> $_ { $$$ } $$$ }' "src/infra/src/file_list/postgres.rs"Length of output: 169
Script:
#!/bin/bash # Extract lines 630-718 from `postgres.rs` for manual review sed -n '630,718p' src/infra/src/file_list/postgres.rsLength of output: 3356
938-953: Ensure the newfile_list_jobstable is indexed properly.The creation of the
file_list_jobstable includes setting up indexes. It's important to verify that these indexes are appropriate for the queries run against this table to ensure optimal performance.Verification successful
The
file_list_jobstable is indexed with thefile_list_jobs_stream_status_idxindex on thestatusandstreamcolumns in thesrc/infra/src/file_list/postgres.rsfile. This confirms that an index is created for the table.
- Index Creation:
src/infra/src/file_list/postgres.rs: "CREATE INDEX IF NOT EXISTS file_list_jobs_stream_status_idx on file_list_jobs (status, stream);"Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Verify that indexes on file_list_jobs are used in queries rg --type rust "FROM file_list_jobs" "src/infra/src/file_list/postgres.rs" | rg "INDEX"Length of output: 85
Script:
#!/bin/bash # Search for index creation commands related to file_list_jobs table rg --type sql "CREATE INDEX" | rg "file_list_jobs"Length of output: 199
Script:
#!/bin/bash # Search for index creation commands related to file_list_jobs table without file type filter rg "CREATE INDEX" | rg "file_list_jobs"Length of output: 461
src/infra/src/file_list/sqlite.rs (4)
718-728: Ensure proper error handling and logging inset_job_done.Currently, the function does not log any information on success or failure, which could hinder debugging. Adding logging could improve observability.
sqlx::query(r#"UPDATE file_list_jobs SET status = $1, updated_at = $2 WHERE id = $3;"#) .bind(super::FileListJobStatus::Done) .bind(config::utils::time::now_micros()) .bind(id) .execute(&*client) .await .map_err(|e| { log::error!("Failed to set job done: {}", e); e.into() })?; log::info!("Job {} marked as done", id); Ok(())
945-961: Ensure the new table creation adheres to schema requirements.The creation of the
file_list_jobstable should be verified to ensure it meets all schema and business logic requirements. This includes checking constraints, types, and indexes.sqlx::query(r#"CREATE TABLE IF NOT EXISTS file_list_jobs (...) "#).execute(&*client).await?; log::info("Table file_list_jobs created successfully."); Ok(())
1020-1026: Check the uniqueness and indexing strategy forfile_list_jobs.The unique index on
streamandoffsetsmight cause issues if the same stream can have jobs with identical offsets. Verify that this indexing strategy aligns with the expected usage patterns.Verification successful
The search results provide extensive information on the usage and constraints related to the
file_list_jobstable across different database systems (SQLite, MySQL, Postgres). Key observations include:
- The
file_list_jobstable schema consistently includesstreamandoffsetsas non-nullable fields.- There are unique indexes created on the combination of
streamandoffsetsin all database systems.- Multiple instances of
INSERT INTO file_list_jobsstatements suggest thatstreamandoffsetsare frequently used together, and there are mechanisms to handle unique constraint violations.- Queries and updates frequently interact with the
file_list_jobstable, indicating its active role in job management.Given these observations, the unique index on
streamandoffsetsseems intentional and aligns with the expected usage patterns. There are no immediate indications of issues with this indexing strategy.
- src/infra/src/file_list/sqlite.rs
- src/infra/src/file_list/mysql.rs
- src/infra/src/file_list/postgres.rs
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check for potential issues with unique index on stream and offsets echo "Add verification logic here"Length of output: 64
Script:
#!/bin/bash # Search for the definition of the file_list_jobs table and any associated constraints or indexes rg "CREATE TABLE.*file_list_jobs" -A 20Length of output: 3761
Script:
#!/bin/bash # Search for comments or documentation related to the usage patterns of stream and offsets rg -i "stream.*offsets" -A 5 # Search for queries or code that interact with the file_list_jobs table rg -i "file_list_jobs" -A 10Length of output: 35474
759-771: Validate the logic for cleaning done jobs.The function deletes jobs based on status and update time. It's crucial to ensure that this does not accidentally delete jobs that are still needed. A verification step or additional logging could be beneficial.
Verification successful
The
FileListJobStatusenum is defined with three variants:Pending,Running, andDone. Theclean_done_jobsfunction deletes jobs with the statusDonethat were updated before a specified date. This logic appears to be sound, as it targets only jobs that are marked asDoneand have not been updated recently.
- The
FileListJobStatusenum definition is located insrc/infra/src/file_list/mod.rs:pub enum FileListJobStatus { #[default] Pending, Running, Done, }Given this context, the function's logic should be verified to ensure it does not accidentally delete essential jobs. However, based on the current information, the implementation seems appropriate for its intended purpose.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Verify that no essential jobs are deleted unintentionally # This script should check the job statuses and their last update times to ensure correctness. echo "Add verification logic here"Length of output: 64
Script:
#!/bin/bash # Find the definition and usage of FileListJobStatus::Done ast-grep --lang rust --pattern 'enum FileListJobStatus { Done, $$$ }'Length of output: 71
Script:
#!/bin/bash # Find the definition of FileListJobStatus enum ast-grep --lang rust --pattern 'enum FileListJobStatus { $$$ }'Length of output: 65
Script:
#!/bin/bash # Search for the term FileListJobStatus in the codebase rg 'FileListJobStatus'Length of output: 1858
Script:
#!/bin/bash # View the definition of FileListJobStatus in src/infra/src/file_list/mod.rs cat src/infra/src/file_list/mod.rs | grep -A 10 'pub enum FileListJobStatus'Length of output: 160
src/config/src/meta/sql.rs (1)
265-265: Updated error message forGroupconversion improves clarity and specificity.src/config/src/config.rs (1)
873-890: Ensure the new configuration fields for the compactor module are well-documented and the default values are appropriate.The new settings
batch_size,job_run_timeout, andjob_clean_wait_timeare crucial for managing job operations efficiently in the compactor module. The default values seem reasonable, but you might want to ensure they align with expected operational parameters in production environments.
| async fn run_generate_job() -> Result<(), anyhow::Error> { | ||
| loop { | ||
| time::sleep(time::Duration::from_secs(get_config().compact.interval)).await; | ||
| let locker = compact::QUEUE_LOCKER.clone(); | ||
| let locker = locker.lock().await; | ||
| log::debug!("[COMPACTOR] Running generate merge job"); | ||
| if let Err(e) = compact::run_generate_job().await { | ||
| log::error!("[COMPACTOR] run generate merge job error: {e}"); | ||
| } | ||
| drop(locker); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip
Codebase Verification
The run_generate_job function in src/service/compact/mod.rs logs errors from merge::generate_job_by_stream but does not escalate or terminate on these errors. This may suppress critical errors that should lead to termination or further action.
src/service/compact/mod.rs: Therun_generate_jobfunction logs errors but does not handle them beyond logging.
Analysis chain
Verify that the error handling in run_generate_job does not suppress critical errors that should lead to a termination or escalation.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify error handling paths in run_generate_job function.
ast-grep --lang rust --pattern $'async fn run_generate_job($_) {
$$$
Err($e) {
$$$
}
$$$
}'
Length of output: 104
Script:
#!/bin/bash
# Find the implementation of the `compact::run_generate_job` function to analyze its error handling.
ast-grep --lang rust --pattern 'async fn run_generate_job($_) { $$$ }'
Length of output: 72
Script:
#!/bin/bash
# Search for the `run_generate_job` function definition within the `compact` module.
rg 'async fn run_generate_job' --context 5
Length of output: 1211
Script:
#!/bin/bash
# Extract the full implementation of the `run_generate_job` function from `src/service/compact/mod.rs`.
sed -n '/pub async fn run_generate_job/,/}/p' src/service/compact/mod.rs
Length of output: 606
Script:
#!/bin/bash
# Extract the full implementation of the `run_generate_job` function from `src/service/compact/mod.rs`, ensuring it captures all lines.
sed -n '/pub async fn run_generate_job/,/^}/p' src/service/compact/mod.rs
Length of output: 2850
| async fn run_check_running_jobs() -> Result<(), anyhow::Error> { | ||
| loop { | ||
| let timeout = get_config().compact.job_run_timeout; | ||
| time::sleep(time::Duration::from_secs(timeout as u64)).await; | ||
| log::debug!("[COMPACTOR] Running check running jobs"); | ||
| let updated_at = config::utils::time::now_micros() - (timeout * 1000 * 1000); | ||
| if let Err(e) = infra::file_list::check_running_jobs(updated_at).await { | ||
| log::error!("[COMPACTOR] run check running jobs error: {e}",); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn run_clean_done_jobs() -> Result<(), anyhow::Error> { | ||
| loop { | ||
| let wait_time = get_config().compact.job_clean_wait_time; | ||
| time::sleep(time::Duration::from_secs(wait_time as u64)).await; | ||
| log::debug!("[COMPACTOR] Running clean done jobs"); | ||
| let updated_at = config::utils::time::now_micros() - (wait_time * 1000 * 1000); | ||
| if let Err(e) = infra::file_list::clean_done_jobs(updated_at).await { | ||
| log::error!("[COMPACTOR] run clean done jobs error: {e}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip
Codebase Verification
The functions run_check_running_jobs and run_clean_done_jobs in src/job/compactor.rs do include error logging when they encounter issues with checking or cleaning jobs. However, there is no additional logic to handle edge cases beyond logging the errors.
run_check_running_jobslogs an error ifinfra::file_list::check_running_jobs(updated_at).awaitfails.run_clean_done_jobslogs an error ifinfra::file_list::clean_done_jobs(updated_at).awaitfails.
These functions should ideally include more robust error handling mechanisms to address edge cases, such as retrying the operation, marking jobs as failed, or sending alerts.
- File:
src/job/compactor.rs - Lines: 174-193
Analysis chain
Check that the logic for run_check_running_jobs and run_clean_done_jobs correctly handles edge cases, such as jobs that fail to update or clean up.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify edge case handling in job management functions.
ast-grep --lang rust --pattern $'async fn run_check_running_jobs($_) {
$$$
if ($condition) {
$$$
}
$$$
}'
Length of output: 118
Script:
#!/bin/bash
# Searching for the implementations of the functions `run_check_running_jobs` and `run_clean_done_jobs`
ast-grep --lang rust --pattern 'async fn run_check_running_jobs($_) { $$$ }'
ast-grep --lang rust --pattern 'async fn run_clean_done_jobs($_) { $$$ }'
Length of output: 154
Script:
#!/bin/bash
# Searching for the definitions of the functions `run_check_running_jobs` and `run_clean_done_jobs` in the codebase
rg 'async fn run_check_running_jobs'
rg 'async fn run_clean_done_jobs'
Length of output: 243
Script:
#!/bin/bash
# Extracting the implementations of the functions `run_check_running_jobs` and `run_clean_done_jobs` from `src/job/compactor.rs`
sed -n '/async fn run_check_running_jobs()/,/^}/p' src/job/compactor.rs
sed -n '/async fn run_clean_done_jobs()/,/^}/p' src/job/compactor.rs
Length of output: 1199
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Outside diff range and nitpick comments (3)
src/service/compact/merge.rs (3)
Line range hint
76-195: Refactor the functiongenerate_job_by_streamto handle job generation by stream.
- The function now correctly handles multiple conditions to ensure that only one node processes a job at a time. It uses distributed locks and checks if another node is already processing the job.
- The use of
LOCAL_NODE_UUIDto check and set the node processing the job is a robust approach.- The function logs the generation of the job which aids in debugging and monitoring.
- Consider adding more detailed comments about the conditions and logic used, especially around the distributed lock handling and node checks, to improve maintainability.
Line range hint
197-512: Refactor the functionmerge_by_streamto handle the merging of files based on stream.
- This function has been significantly refactored to handle different steps of merging files based on a job queue.
- The function now includes detailed logging at various steps which will help in monitoring and debugging.
- There is a comprehensive check for file sizes and partition keys which ensures that only relevant files are merged.
- The error handling and retry mechanisms are robust, particularly in the file list operations.
- Consider simplifying some of the nested logic, especially around file and partition checks, to improve readability and maintainability.
Line range hint
876-1000: Enhance error handling in thewrite_file_list_db_onlyfunction.
- The function attempts multiple retries when writing to the external database which is good for resilience.
- The use of separate lists for flattened and non-flattened files is a good practice as it allows for more targeted operations.
- Consider implementing an exponential backoff mechanism for retries to handle high load or transient issues more effectively.
- Adding more detailed logging about the success or failure of each operation within the retry loop would be beneficial for troubleshooting.
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/service/compact/merge.rs (9 hunks)
Additional context used
Path-based instructions (1)
src/service/compact/merge.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/service/traces/otlp_http.rs (1 hunks)
Additional context used
Path-based instructions (1)
src/service/traces/otlp_http.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
Additional comments not posted (1)
src/service/traces/otlp_http.rs (1)
46-54: Improved error handling by using a match statement to return a detailed BadRequest response if decoding fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Outside diff range and nitpick comments (5)
src/config/src/utils/inverted_index.rs (1)
20-41: Thesplit_tokenfunction is well-implemented for dynamic token splitting and filtering. Consider adding a brief comment explaining the use ofINDEX_MIN_CHAR_LENfor clarity.src/job/files/parquet.rs (4)
Line range hint
818-941: Consider optimizing theprepare_index_record_batchesfunction for better performance and maintainability.
- The function could benefit from a more efficient handling of null columns and term extraction.
- Consider parallel processing or batch processing for handling large datasets.
Line range hint
960-1003: Ensure robust error handling inmove_single_file.
- The function should handle potential failures in file retrieval and record batch creation more gracefully.
- Consider adding retry mechanisms or more informative logging to help diagnose issues.
Line range hint
675-767: Improve thegenerate_index_on_ingesterfunction for better error handling and efficiency.
- The function could be refactored to reduce complexity and improve error propagation.
- Consider using more efficient data structures or algorithms for handling large data volumes.
Line range hint
1004-1100: Optimize themerge_filesfunction for better performance and reliability.
- The function could be optimized to handle large numbers of files more efficiently.
- Consider implementing parallel processing or optimizing file I/O operations.
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (8)
- src/config/src/config.rs (5 hunks)
- src/config/src/utils/inverted_index.rs (1 hunks)
- src/config/src/utils/mod.rs (1 hunks)
- src/ingester/src/writer.rs (1 hunks)
- src/job/files/parquet.rs (7 hunks)
- src/service/search/cluster/mod.rs (2 hunks)
- src/wal/src/lib.rs (1 hunks)
- web/src/plugins/logs/SearchBar.vue (2 hunks)
Files skipped from review as they are similar to previous changes (2)
- src/config/src/config.rs
- web/src/plugins/logs/SearchBar.vue
Additional context used
Path-based instructions (6)
src/config/src/utils/mod.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/wal/src/lib.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/utils/inverted_index.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/ingester/src/writer.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/job/files/parquet.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/search/cluster/mod.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
Additional comments not posted (6)
src/config/src/utils/mod.rs (1)
23-23: The addition of theinverted_indexmodule aligns with the enhancements described in the PR summary.src/wal/src/lib.rs (1)
28-29: Standardizing the file type identifier with constants improves code maintainability and readability.src/ingester/src/writer.rs (1)
287-287: UsingFILE_TYPE_IDENTIFIER_LENimproves the clarity of the condition. Ensure the complex logical condition accurately implements the intended checks.src/job/files/parquet.rs (1)
Line range hint
767-818: Ensure consistency and reliability ingenerate_index_on_compactor.
- Verify that the function consistently handles edge cases and large datasets.
- Consider adding more comprehensive tests to cover various scenarios and data sizes.
src/service/search/cluster/mod.rs (2)
30-30: The import ofsplit_tokenis correctly placed and used effectively in the search logic.
121-121: Usage ofsplit_tokenfunction is correctly implemented for processing search terms in a flexible and efficient manner.
…r can't support multiple compactor working on it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
Outside diff range and nitpick comments (2)
src/service/compact/merge.rs (2)
Line range hint
76-195: Consider simplifying thegenerate_job_by_streamfunction.The function
generate_job_by_streamis quite lengthy and handles multiple responsibilities, including database operations, locking, and conditional checks. It would improve readability and maintainability to break this function into smaller, more focused functions, each handling a specific part of the logic.
Line range hint
197-499: Refactormerge_by_streamto improve error handling and reduce complexity.The
merge_by_streamfunction is complex and handles many operations. Consider breaking it down into smaller functions or methods, each responsible for parts of the workflow like fetching schema, querying files, and handling the merge process. Also, ensure that all potential error paths are handled gracefully and provide meaningful error messages.
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (10)
- src/config/src/meta/stream.rs (1 hunks)
- src/config/src/utils/time.rs (1 hunks)
- src/infra/src/file_list/mod.rs (4 hunks)
- src/infra/src/file_list/mysql.rs (3 hunks)
- src/infra/src/file_list/postgres.rs (4 hunks)
- src/infra/src/file_list/sqlite.rs (3 hunks)
- src/job/compactor.rs (2 hunks)
- src/job/files/idx.rs (1 hunks)
- src/service/compact/merge.rs (8 hunks)
- src/service/compact/mod.rs (5 hunks)
Files skipped from review as they are similar to previous changes (3)
- src/config/src/utils/time.rs
- src/infra/src/file_list/mod.rs
- src/infra/src/file_list/sqlite.rs
Additional context used
Path-based instructions (7)
src/job/files/idx.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/job/compactor.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/compact/mod.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/config/src/meta/stream.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/infra/src/file_list/mysql.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/infra/src/file_list/postgres.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/compact/merge.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
Additional comments not posted (10)
src/job/files/idx.rs (1)
81-81: Improved log message format enhances clarity and traceability.src/service/compact/mod.rs (3)
44-88: Refactoredrun_retentionfunction improves handling based on stream-specific retention settings and enhances error logging with detailed stream information.
Line range hint
138-202: Updatedrun_generate_jobfunction now includes checks for stream-specific settings and detailed error logging, improving reliability and maintainability.
211-343: Therun_mergefunction has been enhanced to handle jobs based on stream settings and partition levels more efficiently, with improved error logging.src/config/src/meta/stream.rs (1)
34-42: AddingALL_STREAM_TYPESas a constant array is a good practice for managing multiple stream types centrally. This allows for easier maintenance and scalability.src/infra/src/file_list/mysql.rs (2)
934-949: Creation offile_list_jobstable is implemented correctly.The SQL statement for creating the
file_list_jobstable is well-formed and includes all necessary fields.
1010-1016: Ensure the uniqueness and efficiency of the new indexes onfile_list_jobs.The creation of unique and non-unique indexes on the
file_list_jobstable is appropriate for optimizing queries and ensuring data integrity.src/infra/src/file_list/postgres.rs (3)
630-718: Review the transaction handling inget_pending_jobsto ensure atomicity and error recovery.async fn get_pending_jobs(&self, node: &str, limit: i64) -> Result<Vec<super::MergeJobRecord>> { let pool = CLIENT.clone(); let mut tx = pool.begin().await?; let lock_key = "file_list_jobs:get_pending_jobs"; let lock_id = config::utils::hash::gxhash::new().sum64(lock_key); let lock_id = if lock_id > i64::MAX as u64 { (lock_id >> 1) as i64 } else { lock_id as i64 }; let lock_sql = format!("SELECT pg_advisory_xact_lock({lock_id})"); sqlx::query(&lock_sql).execute(&mut *tx).await?; let sql = r#" SELECT stream, max(id) as id, COUNT(*)::BIGINT AS num FROM file_list_jobs WHERE status = $1 GROUP BY stream ORDER BY num DESC LIMIT $2; "#; let pending_jobs = sqlx::query_as::<_, super::MergeJobPendingRecord>(sql) .bind(super::FileListJobStatus::Pending) .bind(limit) .fetch_all(&mut *tx) .await?; let ids = pending_jobs.iter().map(|r| r.id.to_string()).collect::<Vec<_>>(); if ids.is_empty() { tx.rollback().await?; return Ok(Vec::new()); } let update_sql = format!( "UPDATE file_list_jobs SET status = $1, node = $2, updated_at = $3 WHERE id IN ({});", ids.join(",") ); sqlx::query(&update_sql) .bind(super::FileListJobStatus::Running) .bind(node) .bind(config::utils::time::now_micros()) .execute(&mut *tx) .await?; let select_sql = format!( "SELECT * FROM file_list_jobs WHERE id IN ({});", ids.join(",") ); let jobs = sqlx::query_as::<_, super::MergeJobRecord>(&select_sql) .fetch_all(&mut *tx) .await?; tx.commit().await?; Ok(jobs) }Ensure that the transaction is properly rolled back on errors and committed only when all operations are successful. This script will verify the transaction management logic.
953-968: Ensure the new tablefile_list_jobsis appropriately indexed for performance.The creation of the
file_list_jobstable with necessary fields and types is correctly implemented. The fields are well-defined, and the SQL syntax is correct.
1029-1035: Check the uniqueness and indexing strategy on thefile_list_jobstable.The unique index on
streamandoffsetsensures that there are no duplicate job entries for the same stream and offset, which is crucial for the integrity of job processing. The status index will aid in efficient querying of jobs based on their current state.
| if let Err(e) = infra_file_list::add_job(org_id, stream_type, stream_name, offset).await { | ||
| return Err(anyhow::anyhow!( | ||
| "[COMAPCT] add file_list_jobs failed: {}", | ||
| e | ||
| )); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enhance error messages for better traceability.
The error message in the catch block of infra_file_list::add_job could include more context about the failing operation, such as org_id, stream_type, and stream_name. This would make debugging easier when reading logs.
- "[COMAPCT] add file_list_jobs failed: {}"
+ "[COMAPCT] add file_list_jobs failed for {}/{}/{} with error: {}", org_id, stream_type, stream_name, eCommittable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if let Err(e) = infra_file_list::add_job(org_id, stream_type, stream_name, offset).await { | |
| return Err(anyhow::anyhow!( | |
| "[COMAPCT] add file_list_jobs failed: {}", | |
| e | |
| )); | |
| if let Err(e) = infra_file_list::add_job(org_id, stream_type, stream_name, offset).await { | |
| return Err(anyhow::anyhow!( | |
| "[COMAPCT] add file_list_jobs failed for {}/{}/{} with error: {}", | |
| org_id, stream_type, stream_name, e | |
| )); |
| .await?; | ||
| // update job status | ||
| if let Err(e) = infra_file_list::set_job_done(job_id).await { | ||
| log::error!("[COMPACT] set_job_done failed: {e}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Standardize error logging format.
Ensure consistent error logging format for better readability and maintenance. The placeholder {e} should be replaced with {} for proper formatting.
- log::error!("[COMPACT] set_job_done failed: {e}");
+ log::error!("[COMPACT] set_job_done failed: {}", e);Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| log::error!("[COMPACT] set_job_done failed: {e}"); | |
| log::error!("[COMPACT] set_job_done failed: {}", e); |
|
|
||
| async fn add_job( | ||
| &self, | ||
| org_id: &str, | ||
| stream_type: StreamType, | ||
| stream: &str, | ||
| offset: i64, | ||
| ) -> Result<()> { | ||
| let stream_key = format!("{org_id}/{stream_type}/{stream}"); | ||
| let pool = CLIENT.clone(); | ||
| match sqlx::query( | ||
| "INSERT IGNORE INTO file_list_jobs (org, stream, offsets, status, node, updated_at) VALUES (?, ?, ?, ?, '', 0);", | ||
| ) | ||
| .bind(org_id) | ||
| .bind(stream_key) | ||
| .bind(offset) | ||
| .bind(super::FileListJobStatus::Pending) | ||
| .execute(&pool) | ||
| .await | ||
| { | ||
| Err(sqlx::Error::Database(e)) => if e.is_unique_violation() { | ||
| Ok(()) | ||
| } else { | ||
| Err(Error::Message(e.to_string())) | ||
| }, | ||
| Err(e) => Err(e.into()), | ||
| Ok(_) => Ok(()), | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider refactoring the error handling in the add_job function for clarity and maintainability.
async fn add_job(
&self,
org_id: &str,
stream_type: StreamType,
stream: &str,
offset: i64,
) -> Result<()> {
let stream_key = format!("{org_id}/{stream_type}/{stream}");
let pool = CLIENT.clone();
sqlx::query(
"INSERT IGNORE INTO file_list_jobs (org, stream, offsets, status, node, updated_at) VALUES (?, ?, ?, ?, '', 0);",
)
.bind(org_id)
.bind(stream_key)
.bind(offset)
.bind(super::FileListJobStatus::Pending)
.execute(&pool)
.await
.map_err(|e| match e {
sqlx::Error::Database(e) if e.is_unique_violation() => Ok(()),
_ => Err(e.into()),
})?
}| async fn get_pending_jobs(&self, node: &str, limit: i64) -> Result<Vec<super::MergeJobRecord>> { | ||
| let pool = CLIENT.clone(); | ||
| let mut tx = pool.begin().await?; | ||
| // get pending jobs group by stream and order by num desc | ||
| let ret = match sqlx::query_as::<_, super::MergeJobPendingRecord>( | ||
| r#" | ||
| SELECT stream, max(id) as id, CAST(COUNT(*) AS SIGNED) AS num | ||
| FROM file_list_jobs | ||
| WHERE status = ? | ||
| GROUP BY stream | ||
| ORDER BY num DESC | ||
| LIMIT ? | ||
| FOR UPDATE;"#, | ||
| ) | ||
| .bind(super::FileListJobStatus::Pending) | ||
| .bind(limit) | ||
| .fetch_all(&mut *tx) | ||
| .await | ||
| { | ||
| Ok(v) => v, | ||
| Err(e) => { | ||
| if let Err(e) = tx.rollback().await { | ||
| log::error!( | ||
| "[MYSQL] rollback select file_list_jobs pending jobs for update error: {e}" | ||
| ); | ||
| } | ||
| return Err(e.into()); | ||
| } | ||
| }; | ||
| // update jobs status to running | ||
| let ids = ret.iter().map(|r| r.id.to_string()).collect::<Vec<_>>(); | ||
| if ids.is_empty() { | ||
| if let Err(e) = tx.rollback().await { | ||
| log::error!("[MYSQL] rollback select file_list_jobs pending jobs error: {e}"); | ||
| } | ||
| return Ok(Vec::new()); | ||
| } | ||
| let sql = format!( | ||
| "UPDATE file_list_jobs SET status = ?, node = ?, updated_at = ? WHERE id IN ({});", | ||
| ids.join(",") | ||
| ); | ||
| if let Err(e) = sqlx::query(&sql) | ||
| .bind(super::FileListJobStatus::Running) | ||
| .bind(node) | ||
| .bind(config::utils::time::now_micros()) | ||
| .execute(&mut *tx) | ||
| .await | ||
| { | ||
| if let Err(e) = tx.rollback().await { | ||
| log::error!("[MYSQL] rollback update file_list_jobs status error: {e}"); | ||
| } | ||
| return Err(e.into()); | ||
| } | ||
| // get jobs by ids | ||
| let sql = format!( | ||
| "SELECT * FROM file_list_jobs WHERE id IN ({});", | ||
| ids.join(",") | ||
| ); | ||
| let ret = match sqlx::query_as::<_, super::MergeJobRecord>(&sql) | ||
| .fetch_all(&mut *tx) | ||
| .await | ||
| { | ||
| Ok(v) => v, | ||
| Err(e) => { | ||
| if let Err(e) = tx.rollback().await { | ||
| log::error!("[MYSQL] rollback select file_list_jobs by ids error: {e}"); | ||
| } | ||
| return Err(e.into()); | ||
| } | ||
| }; | ||
| if let Err(e) = tx.commit().await { | ||
| log::error!("[MYSQL] commit select file_list_jobs pending jobs error: {e}"); | ||
| return Err(e.into()); | ||
| } | ||
| Ok(ret) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize the transaction handling in get_pending_jobs to reduce potential rollback overhead.
async fn get_pending_jobs(&self, node: &str, limit: i64) -> Result<Vec<super::MergeJobRecord>> {
let pool = CLIENT.clone();
let mut tx = pool.begin().await?;
let ret = sqlx::query_as::<_, super::MergeJobPendingRecord>(
r#"
SELECT stream, max(id) as id, CAST(COUNT(*) AS SIGNED) AS num
FROM file_list_jobs
WHERE status = ?
GROUP BY stream
ORDER BY num DESC
LIMIT ?
FOR UPDATE;"#,
)
.bind(super::FileListJobStatus::Pending)
.bind(limit)
.fetch_all(&mut *tx)
.await?;
if ret.is_empty() {
tx.rollback().await?;
return Ok(Vec::new());
}
let ids = ret.iter().map(|r| r.id.to_string()).collect::<Vec<_>>();
let sql = format!(
"UPDATE file_list_jobs SET status = ?, node = ?, updated_at = ? WHERE id IN ({});",
ids.join(",")
);
sqlx::query(&sql)
.bind(super::FileListJobStatus::Running)
.bind(node)
.bind(config::utils::time::now_microis())
.execute(&mut *tx)
.await?;
let ret = sqlx::query_as::<_, super::MergeJobRecord>(
&format!("SELECT * FROM file_list_jobs WHERE id IN ({});", ids.join(","))
)
.fetch_all(&mut *tx)
.await?;
tx.commit().await?;
Ok(ret)
}| async fn set_job_done(&self, id: i64) -> Result<()> { | ||
| let pool = CLIENT.clone(); | ||
| sqlx::query(r#"UPDATE file_list_jobs SET status = ?, updated_at = ? WHERE id = ?;"#) | ||
| .bind(super::FileListJobStatus::Done) | ||
| .bind(config::utils::time::now_micros()) | ||
| .bind(id) | ||
| .execute(&pool) | ||
| .await?; | ||
| Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure proper error handling for database operations in set_job_done.
async fn set_job_done(&self, id: i64) -> Result<()> {
let pool = CLIENT.clone();
let result = sqlx::query(r#"UPDATE file_list_jobs SET status = ?, updated_at = ? WHERE id = ?;"#)
.bind(super::FileListJobStatus::Done)
.bind(config::utils::time::now_micros())
.bind(id)
.execute(&pool)
.await;
match result {
Ok(_) => Ok(()),
Err(e) => {
log::error!("[MYSQL] Error updating job status: {}", e);
Err(e.into())
}
}
}| async fn add_job( | ||
| &self, | ||
| org_id: &str, | ||
| stream_type: StreamType, | ||
| stream: &str, | ||
| offset: i64, | ||
| ) -> Result<()> { | ||
| let stream_key = format!("{org_id}/{stream_type}/{stream}"); | ||
| let pool = CLIENT.clone(); | ||
| match sqlx::query( | ||
| "INSERT INTO file_list_jobs (org, stream, offsets, status, node, updated_at) VALUES ($1, $2, $3, $4, '', 0) ON CONFLICT DO NOTHING;", | ||
| ) | ||
| .bind(org_id) | ||
| .bind(stream_key) | ||
| .bind(offset) | ||
| .bind(super::FileListJobStatus::Pending) | ||
| .execute(&pool) | ||
| .await | ||
| { | ||
| Err(sqlx::Error::Database(e)) => if e.is_unique_violation() { | ||
| Ok(()) | ||
| } else { | ||
| Err(Error::Message(e.to_string())) | ||
| }, | ||
| Err(e) => Err(e.into()), | ||
| Ok(_) => Ok(()), | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider revising the error handling in the add_job method to enhance clarity and reduce complexity.
async fn add_job(
&self,
org_id: &str,
stream_type: StreamType,
stream: &str,
offset: i64,
) -> Result<()> {
let stream_key = format!("{org_id}/{stream_type}/{stream}");
let pool = CLIENT.clone();
let sql = "INSERT INTO file_list_jobs (org, stream, offsets, status, node, updated_at) VALUES ($1, $2, $3, $4, '', 0) ON CONFLICT DO NOTHING;";
let result = sqlx::query(sql)
.bind(org_id)
.bind(stream_key)
.bind(offset)
.bind(super::FileListJobStatus::Pending)
.execute(&pool)
.await;
match result {
Err(sqlx::Error::Database(e)) if e.is_unique_violation() => Ok(()),
Err(e) => Err(e.into()),
Ok(_) => Ok(()),
}
}This refactoring simplifies the match arm and removes unnecessary nesting. It also uses a variable for the SQL query for better readability.
| if let Err(e) = compact::run_merge(tx.clone()).await { | ||
| log::error!("[COMPACTOR] run data merge error: {e}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider enhancing the error message in run_merge to include more details about the error context for better debugging.
- log::error!("[COMPACTOR] run data merge error: {e}");
+ log::error!("[COMPACTOR] run data merge error on stream {stream_name} with job ID {job_id}: {e}");Committable suggestion was skipped due to low confidence.
| if ret.is_err() { | ||
| log::error!("[COMPACTOR] run data delete error: {}", ret.err().unwrap()); | ||
| if let Err(e) = compact::run_retention().await { | ||
| log::error!("[COMPACTOR] run data retention error: {e}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider including more details in the error message for run_retention to aid in debugging.
- log::error!("[COMPACTOR] run data retention error: {e}");
+ log::error!("[COMPACTOR] run data retention error for stream {stream_name}: {e}");Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| log::error!("[COMPACTOR] run data retention error: {e}"); | |
| log::error!("[COMPACTOR] run data retention error for stream {stream_name}: {e}"); |
| if ret.is_err() { | ||
| log::error!("[COMPACTOR] run files delete error: {}", ret.err().unwrap()); | ||
| if let Err(e) = compact::run_delay_deletion().await { | ||
| log::error!("[COMPACTOR] run data delay deletion error: {e}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enhance the error message in run_delay_deletion to provide more context about the failure.
- log::error!("[COMPACTOR] run data delay deletion error: {e}");
+ log::error!("[COMPACTOR] run data delay deletion error during processing stream {stream_name}: {e}");Committable suggestion was skipped due to low confidence.
| log::debug!("[COMPACTOR] Running sync cached compact offset to db"); | ||
| if let Err(e) = crate::service::db::compact::files::sync_cache_to_db().await { | ||
| log::error!("[COMPACTOR] run offset sync cache to db error: {}", e); | ||
| log::error!("[COMPACTOR] run sync cached compact offset to db error: {e}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enhance the error message in run_sync_to_db to include more specific details about the synchronization failure.
- log::error!("[COMPACTOR] run sync cached compact offset to db error: {e}");
+ log::error!("[COMPACTOR] run sync cached compact offset to db error for stream {stream_name}: {e}");Committable suggestion was skipped due to low confidence.
Working flow
New environments
ZO_COMPACT_BATCH_SIZE = 100: default 100, Batch size for compact get pending jobsZO_COMPACT_JOB_RUN_TIMEOUT=600: default 10 mins, If a compact job is not finished in this time, it will be marked as failedZO_COMPACT_JOB_CLEAN_WAIT_TIME=86400: default 1 day, Clean the jobs which are finished more than this timeSummary by CodeRabbit
New Features
Improvements
Bug Fixes
Refactor