Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

@hengfeiyang hengfeiyang commented Jun 18, 2024

Working flow

image
  • meta(Table): metadata table where the compact offset store in
  • file_list_jobs(Table): file list job table
  • stream job: used to generate merging job for streams
  • merging job: get pending jobs and merge the files
  • checking job: reset the jobs which running timeout or the node was dead
  • clean job: clean the jobs which already done over 1 day

New environments

  • ZO_COMPACT_BATCH_SIZE = 100: default 100, Batch size for compact get pending jobs
  • ZO_COMPACT_JOB_RUN_TIMEOUT=600: default 10 mins, If a compact job is not finished in this time, it will be marked as failed
  • ZO_COMPACT_JOB_CLEAN_WAIT_TIME=86400: default 1 day, Clean the jobs which are finished more than this time

Summary by CodeRabbit

  • New Features

    • Added new public fields to configuration for better job management.
    • Introduced new asynchronous functions for job processing in the compactor module.
    • Expanded list of stream types with a new constant array.
    • Added a function to get the current timestamp in microseconds.
  • Improvements

    • Enhanced log messages for better clarity in job processing.
    • Updated job generation logic for merging tasks to improve efficiency.
    • Corrected error messages for better clarity in SQL expressions.
  • Bug Fixes

    • Fixed an error message related to unsupported expressions in group by clauses.
  • Refactor

    • Restructured job and task handling logic for improved performance and reliability.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 18, 2024

Walkthrough

This update enhances job management and processing capabilities within the codebase. Key changes include adding new fields to configuration structs, updating error messages, and introducing functions for precise time measurement. Significant improvements in file and job handling, including new methods for managing merge jobs, a refined merging process, and updated logging, streamline operations and ensure more robust error handling.

Changes

Files Change Summaries
src/config/src/config.rs Added public fields and validation logic to Compact struct, including batch_size, job_run_timeout, and job_clean_wait_time.
src/config/src/meta/sql.rs Corrected the error message for unsupported expression types in group by clauses.
src/config/src/utils/time.rs Introduced a new public function now_micros to get the current timestamp in microseconds.
src/config/src/meta/stream.rs Added ALL_STREAM_TYPES constant array with seven StreamType values.
src/config/src/utils/inverted_index.rs Added split_token function for string tokenization, filtering short tokens.
src/config/src/utils/mod.rs Added a new module inverted_index under the utils module.
src/infra/src/file_list/mod.rs Enhanced FileList trait with new methods for managing merge jobs, and introduced new structs for job management.
src/job/compactor.rs Added asynchronous functions for job generation, checking running jobs, and cleaning completed jobs, with improved timing logic.
src/job/files/idx.rs Updated log message formatting in the write_to_disk function.
src/service/compact/merge.rs Refactored job generation logic, separating it from the merging process, and introduced new method parameters.
src/service/compact/mod.rs Split run_merge into run_generate_job and run_merge for better task and job handling.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Config
    participant Compactor
    participant JobManager
    participant Logger

    User->>Compactor: Initiate Job
    Compactor->>Config: Load Config
    Config-->>Compactor: Send Config Values
    Compactor->>JobManager: Generate Job
    JobManager->>JobManager: Add Job to Queue
    JobManager->>Compactor: Send job ID and offsets
    Compactor->>Logger: Log Job Details
    JobManager->>JobManager: Check Running Jobs
    JobManager->>Logger: Log Job Status

    Note right of Logger: Enhanced logging details
    Logger-->>Config: Log configuration changes
    Logger-->>Compactor: Log Compactor activities
Loading
sequenceDiagram
    participant System
    participant Timestamp
    participant FileList
    participant Job
    participant StreamType

    System->>Timestamp: Call now_micros()
    Timestamp-->>System: Current Timestamp in Microseconds
    System->>FileList: Manage Merge Jobs
    FileList->>Job: Create/Update Job Records
    FileList->>StreamType: Get Stream Type
    StreamType-->>FileList: Stream Type Data
    FileList->>System: Job Details for Merge
Loading

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 17

Outside diff range and nitpick comments (12)
src/service/compact/mod.rs (1)

Line range hint 170-249: The function run_generate_job should handle exceptions more robustly, especially considering it interacts with multiple external systems.

src/infra/src/file_list/mysql.rs (4)

714-722: Review the necessity of updating updated_at without other changes.

The method update_running_jobs only updates the updated_at field. If this is intended to refresh the job status implicitly, consider adding a comment explaining this. Otherwise, review if this update is necessary as it may lead to unnecessary write loads on the database.


724-739: Improve logging for reset operations.

The method check_running_jobs logs a warning when resetting jobs, which might be confused with an error. Consider changing this to an informational log unless the reset indicates an abnormal situation.

if ret.rows_affected() > 0 {
    log::info!("[MYSQL] reset running jobs status to pending");
}

919-934: Ensure the new table structure supports future scalability.

The creation of file_list_jobs with fields like offsets and status is crucial for job management. Consider indexing more columns if query performance becomes an issue as the data grows, especially on fields frequently used in WHERE clauses.


994-1001: Review the necessity of multiple indexes on file_list_jobs.

Multiple indexes have been created on file_list_jobs. While indexes improve query performance, they also increase the storage and maintenance overhead. Review if all these indexes are necessary or if some can be combined for efficiency.

src/infra/src/file_list/postgres.rs (3)

741-756: Add logging for successful job resets in check_running_jobs.

It's good to have logging for when jobs are reset to pending, but it might also be useful to log when the operation completes successfully without any resets. This could help in debugging and understanding the system's behavior under normal conditions.


758-769: Consider adding metrics or monitoring for the clean_done_jobs method.

The method clean_done_jobs is crucial for maintaining the health of the job queue. Consider adding metrics or monitoring around this method to track how often it's called and its performance, which could help in proactive system maintenance.


1013-1020: Validate the necessity of unique and status indexes on file_list_jobs.

The unique index on stream and offsets and the status index might be crucial for performance, but it's also important to validate their necessity to avoid over-indexing which can slow down write operations.

src/service/compact/merge.rs (3)

Line range hint 80-195: Consider handling potential errors more gracefully in generate_job_by_stream.

The function generate_job_by_stream returns early in multiple places without logging the reason for these early returns, which could make debugging difficult. Consider adding logging before each return statement to provide more context on why a particular path was taken.


Line range hint 76-195: Refactor the generate_job_by_stream function to improve readability.

The function generate_job_by_stream is quite long and handles multiple levels of logic. Consider breaking this function into smaller, more focused functions, each handling a specific part of the job generation process. This would improve readability and maintainability.


Line range hint 207-508: Consider modularizing the merge_by_stream function.

The merge_by_stream function is extensive and handles many operations from acquiring locks to updating the database. Splitting this function into smaller, more manageable parts could enhance clarity and make the code easier to maintain.

web/src/plugins/logs/SearchBar.vue (1)

Line range hint 918-930: Validate input before assignment to avoid potential issues.

The refactoring of the downloadRangeData method to include input validation is a positive change, ensuring that the initial number is a positive number before proceeding with the download logic. However, it's crucial to perform this validation before assigning the value to this.searchObj.data.customDownloadQueryObj.query.from. This ensures that only valid data affects the application state, enhancing both security and reliability.

- 930~      this.searchObj.data.customDownloadQueryObj.query.from = initNumber;
+ 930~      // This line has been moved above to ensure validation before assignment
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 9964626 and e397079.

Files selected for processing (11)
  • src/config/src/config.rs (2 hunks)
  • src/config/src/meta/sql.rs (1 hunks)
  • src/config/src/utils/time.rs (1 hunks)
  • src/infra/src/file_list/mod.rs (4 hunks)
  • src/infra/src/file_list/mysql.rs (3 hunks)
  • src/infra/src/file_list/postgres.rs (4 hunks)
  • src/infra/src/file_list/sqlite.rs (3 hunks)
  • src/job/compactor.rs (4 hunks)
  • src/service/compact/merge.rs (9 hunks)
  • src/service/compact/mod.rs (4 hunks)
  • web/src/plugins/logs/SearchBar.vue (2 hunks)
Additional context used
Path-based instructions (10)
src/job/compactor.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/file_list/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/utils/time.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/file_list/mysql.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/file_list/postgres.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/file_list/sqlite.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/meta/sql.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/merge.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/config.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

Additional comments not posted (14)
src/infra/src/file_list/mod.rs (1)

118-129: Ensure that the new methods related to merge job management (add_job, get_pending_jobs, set_job_done, update_running_jobs, check_running_jobs, clean_done_jobs) handle exceptions and errors appropriately, especially in asynchronous contexts.

src/config/src/utils/time.rs (1)

37-40: The function now_micros is a useful addition for high-precision time requirements. Ensure that it is used consistently across the project where such precision is necessary.

src/infra/src/file_list/mysql.rs (2)

626-701: Optimize transaction handling for better performance.
[REFACTOR_SUGGESTion]
This method involves multiple database operations within a transaction, which could be optimized. Consider consolidating the database calls or using batch operations where possible to reduce the overhead of transaction management and improve performance.


741-751: Validate the logic for cleaning up jobs.

The clean_done_jobs method deletes jobs based on the updated_at timestamp and job status. Verify that this logic correctly identifies only the intended jobs to prevent accidental deletion of active jobs.

Verification successful

The gathered context provides a clear view of how the clean_done_jobs function is utilized. The run_clean_done_jobs function in src/job/compactor.rs runs in a loop, periodically invoking clean_done_jobs with a calculated updated_at timestamp, ensuring that only jobs older than a certain threshold are targeted. This logic appears sound and should prevent the accidental deletion of active jobs, as it relies on the updated_at timestamp and the job status.

  • src/job/compactor.rs:

    • run_clean_done_jobs calculates an updated_at timestamp based on a configured wait time.
    • It calls clean_done_jobs with this timestamp in a loop, ensuring periodic cleanup.
  • src/infra/src/file_list/mod.rs:

    • The clean_done_jobs function is defined and called through the CLIENT interface.

The logic for cleaning up jobs based on the updated_at timestamp and job status appears to be correctly implemented.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Check for any active jobs that might be incorrectly marked for deletion.
rg --type rust "clean_done_jobs"

Length of output: 887



Script:

#!/bin/bash
# Extract the implementation of run_clean_done_jobs from src/job/compactor.rs
ast-grep --lang rust --pattern 'async fn run_clean_done_jobs() -> Result<$_, $_> { $$$ }' 

# Extract the relevant parts of src/infra/src/file_list/mod.rs where clean_done_jobs is defined and called
rg -A 10 'async fn clean_done_jobs' src/infra/src/file_list/mod.rs

Length of output: 1526

src/infra/src/file_list/postgres.rs (4)

720-728: Optimize the set_job_done method by caching the database connection.
[PERFORMANCE]
The method set_job_done retrieves a new database connection on every call. This could be optimized by reusing a connection or using a connection pool. Consider modifying the method to accept a database connection as a parameter or use a global/static connection pool.


731-738: Review the necessity of frequent time updates in update_running_jobs.
[PERFORMANCE]
The method update_running_jobs updates the updated_at timestamp every time it's called. If this method is called frequently, it could lead to performance issues due to the constant updates. Consider if there's a way to reduce the frequency of these updates or batch them.


630-718: Ensure proper transaction handling in get_pending_jobs.

The method get_pending_jobs uses a transaction which is good for consistency. However, the rollback scenarios are scattered and might be missed in future modifications. Consider using a transaction management pattern or a context manager that ensures the rollback in all error scenarios.

Verification successful

The extracted code for the get_pending_jobs function reveals that transaction rollbacks are indeed handled in various error scenarios. Each Err(e) block includes a tx.rollback().await call, ensuring that transactions are rolled back upon encountering errors. Here are the key points:

  • Rollback is handled after failing to acquire the advisory lock.
  • Rollback is handled after failing to fetch pending jobs.
  • Rollback is handled if there are no pending jobs to update.
  • Rollback is handled after failing to update job statuses.
  • Rollback is handled after failing to fetch jobs by IDs.

Given this thorough handling of transaction rollbacks, the initial review comment's concern about scattered rollbacks is mitigated.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Verify that all error paths in get_pending_jobs handle transactions correctly
rg --type rust "tx.rollback().await" "src/infra/src/file_list/postgres.rs"

Length of output: 74



Script:

#!/bin/bash
# Extract the `get_pending_jobs` function from `postgres.rs` for manual review
ast-grep --lang rust --pattern $'impl $_ {
    $$$
    async fn get_pending_jobs(&self, $_, $_) -> $_ {
        $$$
    }
    $$$
}' "src/infra/src/file_list/postgres.rs"

Length of output: 169



Script:

#!/bin/bash
# Extract lines 630-718 from `postgres.rs` for manual review
sed -n '630,718p' src/infra/src/file_list/postgres.rs

Length of output: 3356


938-953: Ensure the new file_list_jobs table is indexed properly.

The creation of the file_list_jobs table includes setting up indexes. It's important to verify that these indexes are appropriate for the queries run against this table to ensure optimal performance.

Verification successful

The file_list_jobs table is indexed with the file_list_jobs_stream_status_idx index on the status and stream columns in the src/infra/src/file_list/postgres.rs file. This confirms that an index is created for the table.

  • Index Creation: src/infra/src/file_list/postgres.rs: "CREATE INDEX IF NOT EXISTS file_list_jobs_stream_status_idx on file_list_jobs (status, stream);"
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Verify that indexes on file_list_jobs are used in queries
rg --type rust "FROM file_list_jobs" "src/infra/src/file_list/postgres.rs" | rg "INDEX"

Length of output: 85



Script:

#!/bin/bash
# Search for index creation commands related to file_list_jobs table
rg --type sql "CREATE INDEX" | rg "file_list_jobs"

Length of output: 199



Script:

#!/bin/bash
# Search for index creation commands related to file_list_jobs table without file type filter
rg "CREATE INDEX" | rg "file_list_jobs"

Length of output: 461

src/infra/src/file_list/sqlite.rs (4)

718-728: Ensure proper error handling and logging in set_job_done.

Currently, the function does not log any information on success or failure, which could hinder debugging. Adding logging could improve observability.

sqlx::query(r#"UPDATE file_list_jobs SET status = $1, updated_at = $2 WHERE id = $3;"#)
    .bind(super::FileListJobStatus::Done)
    .bind(config::utils::time::now_micros())
    .bind(id)
    .execute(&*client)
    .await
    .map_err(|e| {
        log::error!("Failed to set job done: {}", e);
        e.into()
    })?;
log::info!("Job {} marked as done", id);
Ok(())

945-961: Ensure the new table creation adheres to schema requirements.

The creation of the file_list_jobs table should be verified to ensure it meets all schema and business logic requirements. This includes checking constraints, types, and indexes.

sqlx::query(r#"CREATE TABLE IF NOT EXISTS file_list_jobs (...) "#).execute(&*client).await?;
log::info("Table file_list_jobs created successfully.");
Ok(())

1020-1026: Check the uniqueness and indexing strategy for file_list_jobs.

The unique index on stream and offsets might cause issues if the same stream can have jobs with identical offsets. Verify that this indexing strategy aligns with the expected usage patterns.

Verification successful

The search results provide extensive information on the usage and constraints related to the file_list_jobs table across different database systems (SQLite, MySQL, Postgres). Key observations include:

  • The file_list_jobs table schema consistently includes stream and offsets as non-nullable fields.
  • There are unique indexes created on the combination of stream and offsets in all database systems.
  • Multiple instances of INSERT INTO file_list_jobs statements suggest that stream and offsets are frequently used together, and there are mechanisms to handle unique constraint violations.
  • Queries and updates frequently interact with the file_list_jobs table, indicating its active role in job management.

Given these observations, the unique index on stream and offsets seems intentional and aligns with the expected usage patterns. There are no immediate indications of issues with this indexing strategy.

  • src/infra/src/file_list/sqlite.rs
  • src/infra/src/file_list/mysql.rs
  • src/infra/src/file_list/postgres.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Check for potential issues with unique index on stream and offsets
echo "Add verification logic here"

Length of output: 64



Script:

#!/bin/bash
# Search for the definition of the file_list_jobs table and any associated constraints or indexes
rg "CREATE TABLE.*file_list_jobs" -A 20

Length of output: 3761



Script:

#!/bin/bash
# Search for comments or documentation related to the usage patterns of stream and offsets
rg -i "stream.*offsets" -A 5

# Search for queries or code that interact with the file_list_jobs table
rg -i "file_list_jobs" -A 10

Length of output: 35474


759-771: Validate the logic for cleaning done jobs.

The function deletes jobs based on status and update time. It's crucial to ensure that this does not accidentally delete jobs that are still needed. A verification step or additional logging could be beneficial.

Verification successful

The FileListJobStatus enum is defined with three variants: Pending, Running, and Done. The clean_done_jobs function deletes jobs with the status Done that were updated before a specified date. This logic appears to be sound, as it targets only jobs that are marked as Done and have not been updated recently.

  • The FileListJobStatus enum definition is located in src/infra/src/file_list/mod.rs:
    pub enum FileListJobStatus {
        #[default]
        Pending,
        Running,
        Done,
    }

Given this context, the function's logic should be verified to ensure it does not accidentally delete essential jobs. However, based on the current information, the implementation seems appropriate for its intended purpose.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Verify that no essential jobs are deleted unintentionally
# This script should check the job statuses and their last update times to ensure correctness.
echo "Add verification logic here"

Length of output: 64



Script:

#!/bin/bash
# Find the definition and usage of FileListJobStatus::Done
ast-grep --lang rust --pattern 'enum FileListJobStatus { Done, $$$ }'

Length of output: 71



Script:

#!/bin/bash
# Find the definition of FileListJobStatus enum
ast-grep --lang rust --pattern 'enum FileListJobStatus { $$$ }'

Length of output: 65



Script:

#!/bin/bash
# Search for the term FileListJobStatus in the codebase
rg 'FileListJobStatus'

Length of output: 1858



Script:

#!/bin/bash
# View the definition of FileListJobStatus in src/infra/src/file_list/mod.rs
cat src/infra/src/file_list/mod.rs | grep -A 10 'pub enum FileListJobStatus'

Length of output: 160

src/config/src/meta/sql.rs (1)

265-265: Updated error message for Group conversion improves clarity and specificity.

src/config/src/config.rs (1)

873-890: Ensure the new configuration fields for the compactor module are well-documented and the default values are appropriate.

The new settings batch_size, job_run_timeout, and job_clean_wait_time are crucial for managing job operations efficiently in the compactor module. The default values seem reasonable, but you might want to ensure they align with expected operational parameters in production environments.

Comment on lines 106 to 117
async fn run_generate_job() -> Result<(), anyhow::Error> {
loop {
time::sleep(time::Duration::from_secs(get_config().compact.interval)).await;
let locker = compact::QUEUE_LOCKER.clone();
let locker = locker.lock().await;
log::debug!("[COMPACTOR] Running generate merge job");
if let Err(e) = compact::run_generate_job().await {
log::error!("[COMPACTOR] run generate merge job error: {e}");
}
drop(locker);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The run_generate_job function in src/service/compact/mod.rs logs errors from merge::generate_job_by_stream but does not escalate or terminate on these errors. This may suppress critical errors that should lead to termination or further action.

  • src/service/compact/mod.rs: The run_generate_job function logs errors but does not handle them beyond logging.
Analysis chain

Verify that the error handling in run_generate_job does not suppress critical errors that should lead to a termination or escalation.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify error handling paths in run_generate_job function.
ast-grep --lang rust --pattern $'async fn run_generate_job($_) {
  $$$
  Err($e) {
    $$$
  }
  $$$
}'

Length of output: 104



Script:

#!/bin/bash
# Find the implementation of the `compact::run_generate_job` function to analyze its error handling.
ast-grep --lang rust --pattern 'async fn run_generate_job($_) { $$$ }'

Length of output: 72



Script:

#!/bin/bash
# Search for the `run_generate_job` function definition within the `compact` module.
rg 'async fn run_generate_job' --context 5

Length of output: 1211



Script:

#!/bin/bash
# Extract the full implementation of the `run_generate_job` function from `src/service/compact/mod.rs`.
sed -n '/pub async fn run_generate_job/,/}/p' src/service/compact/mod.rs

Length of output: 606



Script:

#!/bin/bash
# Extract the full implementation of the `run_generate_job` function from `src/service/compact/mod.rs`, ensuring it captures all lines.
sed -n '/pub async fn run_generate_job/,/^}/p' src/service/compact/mod.rs

Length of output: 2850

Comment on lines 174 to 193
async fn run_check_running_jobs() -> Result<(), anyhow::Error> {
loop {
let timeout = get_config().compact.job_run_timeout;
time::sleep(time::Duration::from_secs(timeout as u64)).await;
log::debug!("[COMPACTOR] Running check running jobs");
let updated_at = config::utils::time::now_micros() - (timeout * 1000 * 1000);
if let Err(e) = infra::file_list::check_running_jobs(updated_at).await {
log::error!("[COMPACTOR] run check running jobs error: {e}",);
}
}
}

async fn run_clean_done_jobs() -> Result<(), anyhow::Error> {
loop {
let wait_time = get_config().compact.job_clean_wait_time;
time::sleep(time::Duration::from_secs(wait_time as u64)).await;
log::debug!("[COMPACTOR] Running clean done jobs");
let updated_at = config::utils::time::now_micros() - (wait_time * 1000 * 1000);
if let Err(e) = infra::file_list::clean_done_jobs(updated_at).await {
log::error!("[COMPACTOR] run clean done jobs error: {e}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The functions run_check_running_jobs and run_clean_done_jobs in src/job/compactor.rs do include error logging when they encounter issues with checking or cleaning jobs. However, there is no additional logic to handle edge cases beyond logging the errors.

  • run_check_running_jobs logs an error if infra::file_list::check_running_jobs(updated_at).await fails.
  • run_clean_done_jobs logs an error if infra::file_list::clean_done_jobs(updated_at).await fails.

These functions should ideally include more robust error handling mechanisms to address edge cases, such as retrying the operation, marking jobs as failed, or sending alerts.

  • File: src/job/compactor.rs
  • Lines: 174-193
Analysis chain

Check that the logic for run_check_running_jobs and run_clean_done_jobs correctly handles edge cases, such as jobs that fail to update or clean up.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify edge case handling in job management functions.
ast-grep --lang rust --pattern $'async fn run_check_running_jobs($_) {
  $$$
  if ($condition) {
    $$$
  }
  $$$
}'

Length of output: 118



Script:

#!/bin/bash
# Searching for the implementations of the functions `run_check_running_jobs` and `run_clean_done_jobs`
ast-grep --lang rust --pattern 'async fn run_check_running_jobs($_) { $$$ }'
ast-grep --lang rust --pattern 'async fn run_clean_done_jobs($_) { $$$ }'

Length of output: 154



Script:

#!/bin/bash
# Searching for the definitions of the functions `run_check_running_jobs` and `run_clean_done_jobs` in the codebase
rg 'async fn run_check_running_jobs'
rg 'async fn run_clean_done_jobs'

Length of output: 243



Script:

#!/bin/bash
# Extracting the implementations of the functions `run_check_running_jobs` and `run_clean_done_jobs` from `src/job/compactor.rs`
sed -n '/async fn run_check_running_jobs()/,/^}/p' src/job/compactor.rs
sed -n '/async fn run_clean_done_jobs()/,/^}/p' src/job/compactor.rs

Length of output: 1199

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Outside diff range and nitpick comments (3)
src/service/compact/merge.rs (3)

Line range hint 76-195: Refactor the function generate_job_by_stream to handle job generation by stream.

  • The function now correctly handles multiple conditions to ensure that only one node processes a job at a time. It uses distributed locks and checks if another node is already processing the job.
  • The use of LOCAL_NODE_UUID to check and set the node processing the job is a robust approach.
  • The function logs the generation of the job which aids in debugging and monitoring.
  • Consider adding more detailed comments about the conditions and logic used, especially around the distributed lock handling and node checks, to improve maintainability.

Line range hint 197-512: Refactor the function merge_by_stream to handle the merging of files based on stream.

  • This function has been significantly refactored to handle different steps of merging files based on a job queue.
  • The function now includes detailed logging at various steps which will help in monitoring and debugging.
  • There is a comprehensive check for file sizes and partition keys which ensures that only relevant files are merged.
  • The error handling and retry mechanisms are robust, particularly in the file list operations.
  • Consider simplifying some of the nested logic, especially around file and partition checks, to improve readability and maintainability.

Line range hint 876-1000: Enhance error handling in the write_file_list_db_only function.

  • The function attempts multiple retries when writing to the external database which is good for resilience.
  • The use of separate lists for flattened and non-flattened files is a good practice as it allows for more targeted operations.
  • Consider implementing an exponential backoff mechanism for retries to handle high load or transient issues more effectively.
  • Adding more detailed logging about the success or failure of each operation within the retry loop would be beneficial for troubleshooting.
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between e397079 and 65974e1.

Files selected for processing (1)
  • src/service/compact/merge.rs (9 hunks)
Additional context used
Path-based instructions (1)
src/service/compact/merge.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 65974e1 and 4ade497.

Files selected for processing (1)
  • src/service/traces/otlp_http.rs (1 hunks)
Additional context used
Path-based instructions (1)
src/service/traces/otlp_http.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

Additional comments not posted (1)
src/service/traces/otlp_http.rs (1)

46-54: Improved error handling by using a match statement to return a detailed BadRequest response if decoding fails.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Outside diff range and nitpick comments (5)
src/config/src/utils/inverted_index.rs (1)

20-41: The split_token function is well-implemented for dynamic token splitting and filtering. Consider adding a brief comment explaining the use of INDEX_MIN_CHAR_LEN for clarity.

src/job/files/parquet.rs (4)

Line range hint 818-941: Consider optimizing the prepare_index_record_batches function for better performance and maintainability.

  • The function could benefit from a more efficient handling of null columns and term extraction.
  • Consider parallel processing or batch processing for handling large datasets.

Line range hint 960-1003: Ensure robust error handling in move_single_file.

  • The function should handle potential failures in file retrieval and record batch creation more gracefully.
  • Consider adding retry mechanisms or more informative logging to help diagnose issues.

Line range hint 675-767: Improve the generate_index_on_ingester function for better error handling and efficiency.

  • The function could be refactored to reduce complexity and improve error propagation.
  • Consider using more efficient data structures or algorithms for handling large data volumes.

Line range hint 1004-1100: Optimize the merge_files function for better performance and reliability.

  • The function could be optimized to handle large numbers of files more efficiently.
  • Consider implementing parallel processing or optimizing file I/O operations.
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 4ade497 and d88a905.

Files selected for processing (8)
  • src/config/src/config.rs (5 hunks)
  • src/config/src/utils/inverted_index.rs (1 hunks)
  • src/config/src/utils/mod.rs (1 hunks)
  • src/ingester/src/writer.rs (1 hunks)
  • src/job/files/parquet.rs (7 hunks)
  • src/service/search/cluster/mod.rs (2 hunks)
  • src/wal/src/lib.rs (1 hunks)
  • web/src/plugins/logs/SearchBar.vue (2 hunks)
Files skipped from review as they are similar to previous changes (2)
  • src/config/src/config.rs
  • web/src/plugins/logs/SearchBar.vue
Additional context used
Path-based instructions (6)
src/config/src/utils/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/wal/src/lib.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/utils/inverted_index.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/ingester/src/writer.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/job/files/parquet.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/search/cluster/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

Additional comments not posted (6)
src/config/src/utils/mod.rs (1)

23-23: The addition of the inverted_index module aligns with the enhancements described in the PR summary.

src/wal/src/lib.rs (1)

28-29: Standardizing the file type identifier with constants improves code maintainability and readability.

src/ingester/src/writer.rs (1)

287-287: Using FILE_TYPE_IDENTIFIER_LEN improves the clarity of the condition. Ensure the complex logical condition accurately implements the intended checks.

src/job/files/parquet.rs (1)

Line range hint 767-818: Ensure consistency and reliability in generate_index_on_compactor.

  • Verify that the function consistently handles edge cases and large datasets.
  • Consider adding more comprehensive tests to cover various scenarios and data sizes.
src/service/search/cluster/mod.rs (2)

30-30: The import of split_token is correctly placed and used effectively in the search logic.


121-121: Usage of split_token function is correctly implemented for processing search terms in a flexible and efficient manner.

@hengfeiyang hengfeiyang marked this pull request as draft June 19, 2024 10:10
@hengfeiyang hengfeiyang marked this pull request as ready for review June 20, 2024 12:50
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Outside diff range and nitpick comments (2)
src/service/compact/merge.rs (2)

Line range hint 76-195: Consider simplifying the generate_job_by_stream function.

The function generate_job_by_stream is quite lengthy and handles multiple responsibilities, including database operations, locking, and conditional checks. It would improve readability and maintainability to break this function into smaller, more focused functions, each handling a specific part of the logic.


Line range hint 197-499: Refactor merge_by_stream to improve error handling and reduce complexity.

The merge_by_stream function is complex and handles many operations. Consider breaking it down into smaller functions or methods, each responsible for parts of the workflow like fetching schema, querying files, and handling the merge process. Also, ensure that all potential error paths are handled gracefully and provide meaningful error messages.

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between d88a905 and 071586d.

Files selected for processing (10)
  • src/config/src/meta/stream.rs (1 hunks)
  • src/config/src/utils/time.rs (1 hunks)
  • src/infra/src/file_list/mod.rs (4 hunks)
  • src/infra/src/file_list/mysql.rs (3 hunks)
  • src/infra/src/file_list/postgres.rs (4 hunks)
  • src/infra/src/file_list/sqlite.rs (3 hunks)
  • src/job/compactor.rs (2 hunks)
  • src/job/files/idx.rs (1 hunks)
  • src/service/compact/merge.rs (8 hunks)
  • src/service/compact/mod.rs (5 hunks)
Files skipped from review as they are similar to previous changes (3)
  • src/config/src/utils/time.rs
  • src/infra/src/file_list/mod.rs
  • src/infra/src/file_list/sqlite.rs
Additional context used
Path-based instructions (7)
src/job/files/idx.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/job/compactor.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/config/src/meta/stream.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/file_list/mysql.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/infra/src/file_list/postgres.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/compact/merge.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

Additional comments not posted (10)
src/job/files/idx.rs (1)

81-81: Improved log message format enhances clarity and traceability.

src/service/compact/mod.rs (3)

44-88: Refactored run_retention function improves handling based on stream-specific retention settings and enhances error logging with detailed stream information.


Line range hint 138-202: Updated run_generate_job function now includes checks for stream-specific settings and detailed error logging, improving reliability and maintainability.


211-343: The run_merge function has been enhanced to handle jobs based on stream settings and partition levels more efficiently, with improved error logging.

src/config/src/meta/stream.rs (1)

34-42: Adding ALL_STREAM_TYPES as a constant array is a good practice for managing multiple stream types centrally. This allows for easier maintenance and scalability.

src/infra/src/file_list/mysql.rs (2)

934-949: Creation of file_list_jobs table is implemented correctly.

The SQL statement for creating the file_list_jobs table is well-formed and includes all necessary fields.


1010-1016: Ensure the uniqueness and efficiency of the new indexes on file_list_jobs.

The creation of unique and non-unique indexes on the file_list_jobs table is appropriate for optimizing queries and ensuring data integrity.

src/infra/src/file_list/postgres.rs (3)

630-718: Review the transaction handling in get_pending_jobs to ensure atomicity and error recovery.

async fn get_pending_jobs(&self, node: &str, limit: i64) -> Result<Vec<super::MergeJobRecord>> {
    let pool = CLIENT.clone();
    let mut tx = pool.begin().await?;
    let lock_key = "file_list_jobs:get_pending_jobs";
    let lock_id = config::utils::hash::gxhash::new().sum64(lock_key);
    let lock_id = if lock_id > i64::MAX as u64 {
        (lock_id >> 1) as i64
    } else {
        lock_id as i64
    };
    let lock_sql = format!("SELECT pg_advisory_xact_lock({lock_id})");
    sqlx::query(&lock_sql).execute(&mut *tx).await?;
    
    let sql = r#"
        SELECT stream, max(id) as id, COUNT(*)::BIGINT AS num
        FROM file_list_jobs 
        WHERE status = $1 
        GROUP BY stream 
        ORDER BY num DESC 
        LIMIT $2;
    "#;
    let pending_jobs = sqlx::query_as::<_, super::MergeJobPendingRecord>(sql)
        .bind(super::FileListJobStatus::Pending)
        .bind(limit)
        .fetch_all(&mut *tx)
        .await?;
    
    let ids = pending_jobs.iter().map(|r| r.id.to_string()).collect::<Vec<_>>();
    if ids.is_empty() {
        tx.rollback().await?;
        return Ok(Vec::new());
    }
    
    let update_sql = format!(
        "UPDATE file_list_jobs SET status = $1, node = $2, updated_at = $3 WHERE id IN ({});",
        ids.join(",")
    );
    sqlx::query(&update_sql)
        .bind(super::FileListJobStatus::Running)
        .bind(node)
        .bind(config::utils::time::now_micros())
        .execute(&mut *tx)
        .await?;
    
    let select_sql = format!(
        "SELECT * FROM file_list_jobs WHERE id IN ({});",
        ids.join(",")
    );
    let jobs = sqlx::query_as::<_, super::MergeJobRecord>(&select_sql)
        .fetch_all(&mut *tx)
        .await?;
    
    tx.commit().await?;
    Ok(jobs)
}

Ensure that the transaction is properly rolled back on errors and committed only when all operations are successful. This script will verify the transaction management logic.


953-968: Ensure the new table file_list_jobs is appropriately indexed for performance.

The creation of the file_list_jobs table with necessary fields and types is correctly implemented. The fields are well-defined, and the SQL syntax is correct.


1029-1035: Check the uniqueness and indexing strategy on the file_list_jobs table.

The unique index on stream and offsets ensures that there are no duplicate job entries for the same stream and offset, which is crucial for the integrity of job processing. The status index will aid in efficient querying of jobs based on their current state.

Comment on lines +172 to +176
if let Err(e) = infra_file_list::add_job(org_id, stream_type, stream_name, offset).await {
return Err(anyhow::anyhow!(
"[COMAPCT] add file_list_jobs failed: {}",
e
));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance error messages for better traceability.

The error message in the catch block of infra_file_list::add_job could include more context about the failing operation, such as org_id, stream_type, and stream_name. This would make debugging easier when reading logs.

- "[COMAPCT] add file_list_jobs failed: {}"
+ "[COMAPCT] add file_list_jobs failed for {}/{}/{} with error: {}", org_id, stream_type, stream_name, e
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Err(e) = infra_file_list::add_job(org_id, stream_type, stream_name, offset).await {
return Err(anyhow::anyhow!(
"[COMAPCT] add file_list_jobs failed: {}",
e
));
if let Err(e) = infra_file_list::add_job(org_id, stream_type, stream_name, offset).await {
return Err(anyhow::anyhow!(
"[COMAPCT] add file_list_jobs failed for {}/{}/{} with error: {}",
org_id, stream_type, stream_name, e
));

.await?;
// update job status
if let Err(e) = infra_file_list::set_job_done(job_id).await {
log::error!("[COMPACT] set_job_done failed: {e}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Standardize error logging format.

Ensure consistent error logging format for better readability and maintenance. The placeholder {e} should be replaced with {} for proper formatting.

- log::error!("[COMPACT] set_job_done failed: {e}");
+ log::error!("[COMPACT] set_job_done failed: {}", e);
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
log::error!("[COMPACT] set_job_done failed: {e}");
log::error!("[COMPACT] set_job_done failed: {}", e);

Comment on lines +596 to +624

async fn add_job(
&self,
org_id: &str,
stream_type: StreamType,
stream: &str,
offset: i64,
) -> Result<()> {
let stream_key = format!("{org_id}/{stream_type}/{stream}");
let pool = CLIENT.clone();
match sqlx::query(
"INSERT IGNORE INTO file_list_jobs (org, stream, offsets, status, node, updated_at) VALUES (?, ?, ?, ?, '', 0);",
)
.bind(org_id)
.bind(stream_key)
.bind(offset)
.bind(super::FileListJobStatus::Pending)
.execute(&pool)
.await
{
Err(sqlx::Error::Database(e)) => if e.is_unique_violation() {
Ok(())
} else {
Err(Error::Message(e.to_string()))
},
Err(e) => Err(e.into()),
Ok(_) => Ok(()),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider refactoring the error handling in the add_job function for clarity and maintainability.

async fn add_job(
    &self,
    org_id: &str,
    stream_type: StreamType,
    stream: &str,
    offset: i64,
) -> Result<()> {
    let stream_key = format!("{org_id}/{stream_type}/{stream}");
    let pool = CLIENT.clone();
    sqlx::query(
        "INSERT IGNORE INTO file_list_jobs (org, stream, offsets, status, node, updated_at) VALUES (?, ?, ?, ?, '', 0);",
    )
    .bind(org_id)
    .bind(stream_key)
    .bind(offset)
    .bind(super::FileListJobStatus::Pending)
    .execute(&pool)
    .await
    .map_err(|e| match e {
        sqlx::Error::Database(e) if e.is_unique_violation() => Ok(()),
        _ => Err(e.into()),
    })?
}

Comment on lines +626 to +701
async fn get_pending_jobs(&self, node: &str, limit: i64) -> Result<Vec<super::MergeJobRecord>> {
let pool = CLIENT.clone();
let mut tx = pool.begin().await?;
// get pending jobs group by stream and order by num desc
let ret = match sqlx::query_as::<_, super::MergeJobPendingRecord>(
r#"
SELECT stream, max(id) as id, CAST(COUNT(*) AS SIGNED) AS num
FROM file_list_jobs
WHERE status = ?
GROUP BY stream
ORDER BY num DESC
LIMIT ?
FOR UPDATE;"#,
)
.bind(super::FileListJobStatus::Pending)
.bind(limit)
.fetch_all(&mut *tx)
.await
{
Ok(v) => v,
Err(e) => {
if let Err(e) = tx.rollback().await {
log::error!(
"[MYSQL] rollback select file_list_jobs pending jobs for update error: {e}"
);
}
return Err(e.into());
}
};
// update jobs status to running
let ids = ret.iter().map(|r| r.id.to_string()).collect::<Vec<_>>();
if ids.is_empty() {
if let Err(e) = tx.rollback().await {
log::error!("[MYSQL] rollback select file_list_jobs pending jobs error: {e}");
}
return Ok(Vec::new());
}
let sql = format!(
"UPDATE file_list_jobs SET status = ?, node = ?, updated_at = ? WHERE id IN ({});",
ids.join(",")
);
if let Err(e) = sqlx::query(&sql)
.bind(super::FileListJobStatus::Running)
.bind(node)
.bind(config::utils::time::now_micros())
.execute(&mut *tx)
.await
{
if let Err(e) = tx.rollback().await {
log::error!("[MYSQL] rollback update file_list_jobs status error: {e}");
}
return Err(e.into());
}
// get jobs by ids
let sql = format!(
"SELECT * FROM file_list_jobs WHERE id IN ({});",
ids.join(",")
);
let ret = match sqlx::query_as::<_, super::MergeJobRecord>(&sql)
.fetch_all(&mut *tx)
.await
{
Ok(v) => v,
Err(e) => {
if let Err(e) = tx.rollback().await {
log::error!("[MYSQL] rollback select file_list_jobs by ids error: {e}");
}
return Err(e.into());
}
};
if let Err(e) = tx.commit().await {
log::error!("[MYSQL] commit select file_list_jobs pending jobs error: {e}");
return Err(e.into());
}
Ok(ret)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize the transaction handling in get_pending_jobs to reduce potential rollback overhead.

async fn get_pending_jobs(&self, node: &str, limit: i64) -> Result<Vec<super::MergeJobRecord>> {
    let pool = CLIENT.clone();
    let mut tx = pool.begin().await?;
    let ret = sqlx::query_as::<_, super::MergeJobPendingRecord>(
        r#"
SELECT stream, max(id) as id, CAST(COUNT(*) AS SIGNED) AS num
    FROM file_list_jobs 
    WHERE status = ? 
    GROUP BY stream 
    ORDER BY num DESC 
    LIMIT ? 
    FOR UPDATE;"#,
    )
    .bind(super::FileListJobStatus::Pending)
    .bind(limit)
    .fetch_all(&mut *tx)
    .await?;

    if ret.is_empty() {
        tx.rollback().await?;
        return Ok(Vec::new());
    }

    let ids = ret.iter().map(|r| r.id.to_string()).collect::<Vec<_>>();
    let sql = format!(
        "UPDATE file_list_jobs SET status = ?, node = ?, updated_at = ? WHERE id IN ({});",
        ids.join(",")
    );
    sqlx::query(&sql)
        .bind(super::FileListJobStatus::Running)
        .bind(node)
        .bind(config::utils::time::now_microis())
        .execute(&mut *tx)
        .await?;

    let ret = sqlx::query_as::<_, super::MergeJobRecord>(
        &format!("SELECT * FROM file_list_jobs WHERE id IN ({});", ids.join(","))
    )
    .fetch_all(&mut *tx)
    .await?;

    tx.commit().await?;
    Ok(ret)
}

Comment on lines +719 to +727
async fn set_job_done(&self, id: i64) -> Result<()> {
let pool = CLIENT.clone();
sqlx::query(r#"UPDATE file_list_jobs SET status = ?, updated_at = ? WHERE id = ?;"#)
.bind(super::FileListJobStatus::Done)
.bind(config::utils::time::now_micros())
.bind(id)
.execute(&pool)
.await?;
Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper error handling for database operations in set_job_done.

async fn set_job_done(&self, id: i64) -> Result<()> {
    let pool = CLIENT.clone();
    let result = sqlx::query(r#"UPDATE file_list_jobs SET status = ?, updated_at = ? WHERE id = ?;"#)
        .bind(super::FileListJobStatus::Done)
        .bind(config::utils::time::now_micros())
        .bind(id)
        .execute(&pool)
        .await;

    match result {
        Ok(_) => Ok(()),
        Err(e) => {
            log::error!("[MYSQL] Error updating job status: {}", e);
            Err(e.into())
        }
    }
}

Comment on lines +601 to +628
async fn add_job(
&self,
org_id: &str,
stream_type: StreamType,
stream: &str,
offset: i64,
) -> Result<()> {
let stream_key = format!("{org_id}/{stream_type}/{stream}");
let pool = CLIENT.clone();
match sqlx::query(
"INSERT INTO file_list_jobs (org, stream, offsets, status, node, updated_at) VALUES ($1, $2, $3, $4, '', 0) ON CONFLICT DO NOTHING;",
)
.bind(org_id)
.bind(stream_key)
.bind(offset)
.bind(super::FileListJobStatus::Pending)
.execute(&pool)
.await
{
Err(sqlx::Error::Database(e)) => if e.is_unique_violation() {
Ok(())
} else {
Err(Error::Message(e.to_string()))
},
Err(e) => Err(e.into()),
Ok(_) => Ok(()),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider revising the error handling in the add_job method to enhance clarity and reduce complexity.

async fn add_job(
    &self,
    org_id: &str,
    stream_type: StreamType,
    stream: &str,
    offset: i64,
) -> Result<()> {
    let stream_key = format!("{org_id}/{stream_type}/{stream}");
    let pool = CLIENT.clone();
    let sql = "INSERT INTO file_list_jobs (org, stream, offsets, status, node, updated_at) VALUES ($1, $2, $3, $4, '', 0) ON CONFLICT DO NOTHING;";
    let result = sqlx::query(sql)
        .bind(org_id)
        .bind(stream_key)
        .bind(offset)
        .bind(super::FileListJobStatus::Pending)
        .execute(&pool)
        .await;
    
    match result {
        Err(sqlx::Error::Database(e)) if e.is_unique_violation() => Ok(()),
        Err(e) => Err(e.into()),
        Ok(_) => Ok(()),
    }
}

This refactoring simplifies the match arm and removes unnecessary nesting. It also uses a variable for the SQL query for better readability.

Comment on lines +121 to +122
if let Err(e) = compact::run_merge(tx.clone()).await {
log::error!("[COMPACTOR] run data merge error: {e}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider enhancing the error message in run_merge to include more details about the error context for better debugging.

- log::error!("[COMPACTOR] run data merge error: {e}");
+ log::error!("[COMPACTOR] run data merge error on stream {stream_name} with job ID {job_id}: {e}");

Committable suggestion was skipped due to low confidence.

if ret.is_err() {
log::error!("[COMPACTOR] run data delete error: {}", ret.err().unwrap());
if let Err(e) = compact::run_retention().await {
log::error!("[COMPACTOR] run data retention error: {e}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider including more details in the error message for run_retention to aid in debugging.

- log::error!("[COMPACTOR] run data retention error: {e}");
+ log::error!("[COMPACTOR] run data retention error for stream {stream_name}: {e}");
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
log::error!("[COMPACTOR] run data retention error: {e}");
log::error!("[COMPACTOR] run data retention error for stream {stream_name}: {e}");

if ret.is_err() {
log::error!("[COMPACTOR] run files delete error: {}", ret.err().unwrap());
if let Err(e) = compact::run_delay_deletion().await {
log::error!("[COMPACTOR] run data delay deletion error: {e}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance the error message in run_delay_deletion to provide more context about the failure.

- log::error!("[COMPACTOR] run data delay deletion error: {e}");
+ log::error!("[COMPACTOR] run data delay deletion error during processing stream {stream_name}: {e}");

Committable suggestion was skipped due to low confidence.

log::debug!("[COMPACTOR] Running sync cached compact offset to db");
if let Err(e) = crate::service::db::compact::files::sync_cache_to_db().await {
log::error!("[COMPACTOR] run offset sync cache to db error: {}", e);
log::error!("[COMPACTOR] run sync cached compact offset to db error: {e}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance the error message in run_sync_to_db to include more specific details about the synchronization failure.

- log::error!("[COMPACTOR] run sync cached compact offset to db error: {e}");
+ log::error!("[COMPACTOR] run sync cached compact offset to db error for stream {stream_name}: {e}");

Committable suggestion was skipped due to low confidence.

@hengfeiyang hengfeiyang merged commit b6d5379 into main Jun 20, 2024
@hengfeiyang hengfeiyang deleted the feat/compactor-queue branch June 20, 2024 13:45
@coderabbitai coderabbitai bot mentioned this pull request Jan 27, 2025
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants