Skip to content

Conversation

@taimingl
Copy link
Contributor

@taimingl taimingl commented Nov 4, 2024

impl #4974

envs to configure usage reporting jobs:

  1. ZO_USAGE_REPORTING_THREAD_NUM: number of threads handling the background usage reporting job.
    default: 0
    -> local_mode: cpu_num / 2
    -> distributed: cpu_num
  2. ZO_USAGE_BATCH_SIZE: num of usage data to be buffered in memory before all buffered is ingested into usage org
    default: 2,000
    applied to each thread
  3. ZO_USAGE_PUBLISH_INTERVAL: max num of seconds between each ingestion, regardless if ZO_USAGE_BATCH_SIZE is met or not.
    default: 60s
    applied to each thread

Summary by CodeRabbit

  • New Features

    • Introduced a configurable field for dynamic thread allocation in usage reporting based on environment settings.
    • Enhanced concurrency in the usage reporting system, allowing multiple threads to process usage messages simultaneously.
  • Bug Fixes

    • Improved error handling in logging for usage-related functions.
  • Documentation

    • Updated logging statements for better clarity and context regarding thread processing.

@taimingl taimingl requested a review from hengfeiyang November 4, 2024 22:54
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 4, 2024

Walkthrough

The changes in this pull request introduce a new public field usage_reporting_thread_num to the Limit struct in the configuration, allowing dynamic configuration of usage reporting threads based on environment settings. Additionally, the concurrency model of the usage reporting system is enhanced by wrapping the message receiver in a Mutex, enabling safe concurrent access. The functions related to usage ingestion are updated to support this new threading model, improving the system's ability to handle usage messages efficiently.

Changes

File Change Summary
src/config/src/config.rs Added public field usage_reporting_thread_num: usize to Limit struct; updated init function to set this field based on CPU core count.
src/service/usage/mod.rs Wrapped msg_receiver in Arc<Mutex<...>>; updated ingest_usage_job and ingest_buffered_usage signatures to include thread_id and msg_receiver; modified threading model for usage reporting; enhanced logging and shutdown handling.

Possibly related PRs

  • fix: add _timestamp to usage reporting #3799: The changes in this PR involve modifications to the UsageData struct in src/config/src/meta/usage.rs, which is related to usage reporting, similar to the enhancements made in the main PR regarding the usage_reporting_thread_num field in the Limit struct.
  • feat: channel-based usage queue for async usage reporting #4949: This PR introduces a channel-based usage queue for async usage reporting, which aligns with the main PR's focus on enhancing the configurability and concurrency of usage reporting through the addition of the usage_reporting_thread_num field.

Suggested reviewers

  • hengfeiyang

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (2)
src/config/src/config.rs (1)

893-894: Add documentation for the new configuration field.

The field usage_reporting_thread_num would benefit from documentation that explains:

  • Its purpose for configuring usage reporting threads
  • The meaning of the default value 0 (auto-configuration based on CPU cores)
  • The expected range of valid values

Add rustdoc comments like this:

+/// Number of threads to handle background usage reporting jobs.
+/// Default value of 0 means:
+/// - In local mode: Use half of the available CPU cores
+/// - In distributed mode: Use all available CPU cores
+/// Valid values: 0 or any positive integer
 #[env_config(name = "ZO_USAGE_REPORTING_THREAD_NUM", default = 0)]
 pub usage_reporting_thread_num: usize,
src/service/usage/mod.rs (1)

256-256: Improve log message clarity

The log message "Returning as no usages reported" could be more informative. Including the function name or additional context can aid in debugging and improve log traceability.

Apply this diff to enhance the log message:

-log::info!("Returning as no usages reported ");
+log::info!("ingest_usages: No usage data to process.");
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 287185a and 9e760eb.

📒 Files selected for processing (2)
  • src/config/src/config.rs (2 hunks)
  • src/service/usage/mod.rs (9 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
src/config/src/config.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/usage/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

🔇 Additional comments (1)
src/config/src/config.rs (1)

1410-1417: LGTM! Thread initialization logic is well-implemented.

The initialization logic correctly:

  • Sets thread count based on local/distributed mode
  • Ensures at least 1 thread using std::cmp::max
  • Follows the same pattern as other thread configurations

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (3)
src/service/usage/mod.rs (3)

57-63: Enhance thread initialization logging

Add more context to the log message to help with monitoring and debugging:

+    log::info!(
+        "[USAGE] Initializing {} usage reporting threads with batch_size={}, timeout={}s",
+        cfg.limit.usage_reporting_thread_num,
+        batch_size,
+        timeout.as_secs()
+    );
     for thread_id in 0..cfg.limit.usage_reporting_thread_num {

524-533: Improve error handling in message processing

The current implementation silently continues on error. Consider adding error context and metrics:

     let mut msg_receiver = msg_receiver.lock().await;
     tokio::select! {
         msg = msg_receiver.recv() => {
             match msg {
                 Some(UsageMessage::Data(usage_buf)) => {
                     log::debug!("[USAGE:JOB] thread_{thread_id} received and queued {} messages.", usage_buf.len());
                     usage_report_runner.push(usage_buf);
                     if usage_report_runner.should_process() {
                         let buffered = usage_report_runner.take_batch();
-                        ingest_buffered_usage(thread_id, buffered).await;
+                        if let Err(e) = ingest_buffered_usage(thread_id, buffered).await {
+                            log::error!("[USAGE:JOB] thread_{thread_id} failed to ingest buffered usage: {}", e);
+                            metrics::USAGE_INGEST_ERRORS.inc();
+                        }
                     }
                 }

563-567: Return Result from ingest_buffered_usage for proper error handling

The function should propagate errors to allow the caller to handle them appropriately:

-async fn ingest_buffered_usage(thread_id: usize, usage_buffer: Vec<UsageBuffer>) {
+async fn ingest_buffered_usage(thread_id: usize, usage_buffer: Vec<UsageBuffer>) -> Result<(), anyhow::Error> {
     log::debug!(
         "[USAGE:JOB] thread_{thread_id} ingests {} buffered usage data",
         usage_buffer.len()
     );
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 9e760eb and b22c640.

📒 Files selected for processing (1)
  • src/service/usage/mod.rs (9 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
src/service/usage/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

@taimingl taimingl force-pushed the feat/usage-report-cfg branch from b22c640 to fe5a57d Compare November 5, 2024 13:03
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (3)
src/service/usage/mod.rs (2)

57-63: Consider using a thread pool for better resource management

Instead of spawning individual tasks for each thread, consider using a thread pool (e.g., tokio::runtime::Handle::current().spawn_blocking()) to better manage system resources and prevent potential thread explosion in case of misconfiguration.


Line range hint 524-559: Improve error handling in message processing

The current implementation silently ignores errors from ingest_buffered_usage. Consider adding error handling and potentially implementing a retry mechanism for failed ingestions.

-                            ingest_buffered_usage(thread_id, buffered).await;
+                            if let Err(e) = ingest_buffered_usage(thread_id, buffered).await {
+                                log::error!("[USAGE:JOB] thread_{thread_id} failed to ingest: {e}");
+                                // TODO: Implement retry mechanism
+                            }
src/config/src/config.rs (1)

893-894: Add documentation for the new configuration field.

Please add documentation for the usage_reporting_thread_num field using the help parameter in the env_config attribute to describe:

  • Purpose of the configuration
  • Default behavior (0 = half CPU cores in local mode, full CPU cores in distributed mode)
-    #[env_config(name = "ZO_USAGE_REPORTING_THREAD_NUM", default = 0)]
+    #[env_config(
+        name = "ZO_USAGE_REPORTING_THREAD_NUM",
+        default = 0,
+        help = "Number of threads for usage reporting. Default 0 means half of CPU cores in local mode, full CPU cores in distributed mode"
+    )]
     pub usage_reporting_thread_num: usize,
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b22c640 and fe5a57d.

📒 Files selected for processing (2)
  • src/config/src/config.rs (2 hunks)
  • src/service/usage/mod.rs (9 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
src/config/src/config.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

src/service/usage/mod.rs (1)

Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions

Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.

🔇 Additional comments (1)
src/config/src/config.rs (1)

1410-1417: LGTM! Thread initialization logic looks good.

The implementation correctly:

  • Sets thread count based on CPU cores
  • Handles both local and distributed modes
  • Ensures at least 1 thread with std::cmp::max

Let's verify the CPU core detection:

✅ Verification successful

Let me verify the CPU core detection implementation further by checking the cgroup logic:


CPU core detection and thread initialization implementation is robust and well-designed

The implementation is correct and comprehensive:

  • Properly detects CPU cores using both cgroup v1 and v2 mechanisms
  • Falls back to system CPU detection if cgroup limits aren't available
  • Ensures minimum thread counts (at least 2 CPU cores for general use)
  • Correctly handles thread allocation for both local mode (half of cores) and distributed mode (all cores)
  • Has proper error handling and fallbacks throughout the detection chain

The review comment's approval was accurate and the implementation is verified to be solid.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify CPU core detection and thread calculation logic
# Test: Search for CPU core detection and thread calculations
rg "get_cpu_limit|usage_reporting_thread_num.*=.*cpu" -B 2 -A 2

Length of output: 1104


Script:

#!/bin/bash
# Check cgroup CPU detection implementation
rg "read_cpu_cgroup" -A 10

Length of output: 2206

Comment on lines +246 to +252
for _ in 0..get_config().limit.usage_reporting_thread_num {
let (res_sender, res_receiver) = oneshot::channel();
if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await {
log::error!("[USAGE] Error shutting down USAGE_QUEUER: {e}");
}
// wait for flush ingestion job
res_receiver.await.ok();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential deadlock in shutdown sequence

The current implementation waits for each thread's response sequentially. If any thread fails to respond, it could block the shutdown process indefinitely. Consider using tokio::time::timeout or collecting all responses and waiting with join_all.

-    for _ in 0..get_config().limit.usage_reporting_thread_num {
-        let (res_sender, res_receiver) = oneshot::channel();
-        if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await {
-            log::error!("[USAGE] Error shutting down USAGE_QUEUER: {e}");
-        }
-        // wait for flush ingestion job
-        res_receiver.await.ok();
-    }
+    let mut shutdown_handles = Vec::new();
+    for _ in 0..get_config().limit.usage_reporting_thread_num {
+        let (res_sender, res_receiver) = oneshot::channel();
+        if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await {
+            log::error!("[USAGE] Error shutting down USAGE_QUEUER: {e}");
+            continue;
+        }
+        shutdown_handles.push(res_receiver);
+    }
+    // Wait for all threads with timeout
+    for handle in shutdown_handles {
+        if let Err(e) = tokio::time::timeout(
+            std::time::Duration::from_secs(5),
+            handle
+        ).await {
+            log::error!("[USAGE] Timeout waiting for thread shutdown: {e}");
+        }
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _ in 0..get_config().limit.usage_reporting_thread_num {
let (res_sender, res_receiver) = oneshot::channel();
if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await {
log::error!("[USAGE] Error shutting down USAGE_QUEUER: {e}");
}
// wait for flush ingestion job
res_receiver.await.ok();
let mut shutdown_handles = Vec::new();
for _ in 0..get_config().limit.usage_reporting_thread_num {
let (res_sender, res_receiver) = oneshot::channel();
if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await {
log::error!("[USAGE] Error shutting down USAGE_QUEUER: {e}");
continue;
}
shutdown_handles.push(res_receiver);
}
// Wait for all threads with timeout
for handle in shutdown_handles {
if let Err(e) = tokio::time::timeout(
std::time::Duration::from_secs(5),
handle
).await {
log::error!("[USAGE] Timeout waiting for thread shutdown: {e}");
}
}

Comment on lines +563 to +567
async fn ingest_buffered_usage(thread_id: usize, usage_buffer: Vec<UsageBuffer>) {
log::debug!(
"[USAGE:JOB] thread_{thread_id} ingests {} buffered usage data",
usage_buffer.len()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Return Result for better error handling

The function should return a Result to properly propagate errors to the caller, enabling better error handling and potential retry mechanisms.

-async fn ingest_buffered_usage(thread_id: usize, usage_buffer: Vec<UsageBuffer>) {
+async fn ingest_buffered_usage(thread_id: usize, usage_buffer: Vec<UsageBuffer>) -> Result<(), anyhow::Error> {
     log::debug!(
         "[USAGE:JOB] thread_{thread_id} ingests {} buffered usage data",
         usage_buffer.len()
     );
     let (mut usage_data, mut trigger_data) = (Vec::new(), Vec::new());
     for item in usage_buffer {
         match item {
             UsageBuffer::Usage(usage) => usage_data.push(*usage),
             UsageBuffer::Trigger(trigger) => trigger_data.push(*trigger),
         }
     }
     if !usage_data.is_empty() {
-        ingest_usages(usage_data).await;
+        ingest_usages(usage_data).await?;
     }
     if !trigger_data.is_empty() {
-        ingest_trigger_usages(trigger_data).await;
+        ingest_trigger_usages(trigger_data).await?;
     }
+    Ok(())
 }

Committable suggestion skipped: line range outside the PR's diff.

@taimingl taimingl merged commit 0c29118 into main Nov 5, 2024
28 checks passed
@taimingl taimingl deleted the feat/usage-report-cfg branch November 5, 2024 13:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants