-
Notifications
You must be signed in to change notification settings - Fork 711
feat: configurable thread number for usage reporting job #4988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes in this pull request introduce a new public field Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (2)
src/config/src/config.rs (1)
893-894: Add documentation for the new configuration field.The field
usage_reporting_thread_numwould benefit from documentation that explains:
- Its purpose for configuring usage reporting threads
- The meaning of the default value 0 (auto-configuration based on CPU cores)
- The expected range of valid values
Add rustdoc comments like this:
+/// Number of threads to handle background usage reporting jobs. +/// Default value of 0 means: +/// - In local mode: Use half of the available CPU cores +/// - In distributed mode: Use all available CPU cores +/// Valid values: 0 or any positive integer #[env_config(name = "ZO_USAGE_REPORTING_THREAD_NUM", default = 0)] pub usage_reporting_thread_num: usize,src/service/usage/mod.rs (1)
256-256: Improve log message clarityThe log message
"Returning as no usages reported"could be more informative. Including the function name or additional context can aid in debugging and improve log traceability.Apply this diff to enhance the log message:
-log::info!("Returning as no usages reported "); +log::info!("ingest_usages: No usage data to process.");
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
src/config/src/config.rs(2 hunks)src/service/usage/mod.rs(9 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
src/config/src/config.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/usage/mod.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
🔇 Additional comments (1)
src/config/src/config.rs (1)
1410-1417: LGTM! Thread initialization logic is well-implemented.
The initialization logic correctly:
- Sets thread count based on local/distributed mode
- Ensures at least 1 thread using
std::cmp::max - Follows the same pattern as other thread configurations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (3)
src/service/usage/mod.rs (3)
57-63: Enhance thread initialization loggingAdd more context to the log message to help with monitoring and debugging:
+ log::info!( + "[USAGE] Initializing {} usage reporting threads with batch_size={}, timeout={}s", + cfg.limit.usage_reporting_thread_num, + batch_size, + timeout.as_secs() + ); for thread_id in 0..cfg.limit.usage_reporting_thread_num {
524-533: Improve error handling in message processingThe current implementation silently continues on error. Consider adding error context and metrics:
let mut msg_receiver = msg_receiver.lock().await; tokio::select! { msg = msg_receiver.recv() => { match msg { Some(UsageMessage::Data(usage_buf)) => { log::debug!("[USAGE:JOB] thread_{thread_id} received and queued {} messages.", usage_buf.len()); usage_report_runner.push(usage_buf); if usage_report_runner.should_process() { let buffered = usage_report_runner.take_batch(); - ingest_buffered_usage(thread_id, buffered).await; + if let Err(e) = ingest_buffered_usage(thread_id, buffered).await { + log::error!("[USAGE:JOB] thread_{thread_id} failed to ingest buffered usage: {}", e); + metrics::USAGE_INGEST_ERRORS.inc(); + } } }
563-567: Return Result from ingest_buffered_usage for proper error handlingThe function should propagate errors to allow the caller to handle them appropriately:
-async fn ingest_buffered_usage(thread_id: usize, usage_buffer: Vec<UsageBuffer>) { +async fn ingest_buffered_usage(thread_id: usize, usage_buffer: Vec<UsageBuffer>) -> Result<(), anyhow::Error> { log::debug!( "[USAGE:JOB] thread_{thread_id} ingests {} buffered usage data", usage_buffer.len() );
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
src/service/usage/mod.rs(9 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
src/service/usage/mod.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
b22c640 to
fe5a57d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (3)
src/service/usage/mod.rs (2)
57-63: Consider using a thread pool for better resource managementInstead of spawning individual tasks for each thread, consider using a thread pool (e.g.,
tokio::runtime::Handle::current().spawn_blocking()) to better manage system resources and prevent potential thread explosion in case of misconfiguration.
Line range hint
524-559: Improve error handling in message processingThe current implementation silently ignores errors from
ingest_buffered_usage. Consider adding error handling and potentially implementing a retry mechanism for failed ingestions.- ingest_buffered_usage(thread_id, buffered).await; + if let Err(e) = ingest_buffered_usage(thread_id, buffered).await { + log::error!("[USAGE:JOB] thread_{thread_id} failed to ingest: {e}"); + // TODO: Implement retry mechanism + }src/config/src/config.rs (1)
893-894: Add documentation for the new configuration field.Please add documentation for the
usage_reporting_thread_numfield using thehelpparameter in theenv_configattribute to describe:
- Purpose of the configuration
- Default behavior (0 = half CPU cores in local mode, full CPU cores in distributed mode)
- #[env_config(name = "ZO_USAGE_REPORTING_THREAD_NUM", default = 0)] + #[env_config( + name = "ZO_USAGE_REPORTING_THREAD_NUM", + default = 0, + help = "Number of threads for usage reporting. Default 0 means half of CPU cores in local mode, full CPU cores in distributed mode" + )] pub usage_reporting_thread_num: usize,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
src/config/src/config.rs(2 hunks)src/service/usage/mod.rs(9 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
src/config/src/config.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/usage/mod.rs (1)
Pattern **/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestions
Make sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
🔇 Additional comments (1)
src/config/src/config.rs (1)
1410-1417: LGTM! Thread initialization logic looks good.
The implementation correctly:
- Sets thread count based on CPU cores
- Handles both local and distributed modes
- Ensures at least 1 thread with
std::cmp::max
Let's verify the CPU core detection:
✅ Verification successful
Let me verify the CPU core detection implementation further by checking the cgroup logic:
CPU core detection and thread initialization implementation is robust and well-designed
The implementation is correct and comprehensive:
- Properly detects CPU cores using both cgroup v1 and v2 mechanisms
- Falls back to system CPU detection if cgroup limits aren't available
- Ensures minimum thread counts (at least 2 CPU cores for general use)
- Correctly handles thread allocation for both local mode (half of cores) and distributed mode (all cores)
- Has proper error handling and fallbacks throughout the detection chain
The review comment's approval was accurate and the implementation is verified to be solid.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify CPU core detection and thread calculation logic
# Test: Search for CPU core detection and thread calculations
rg "get_cpu_limit|usage_reporting_thread_num.*=.*cpu" -B 2 -A 2
Length of output: 1104
Script:
#!/bin/bash
# Check cgroup CPU detection implementation
rg "read_cpu_cgroup" -A 10
Length of output: 2206
| for _ in 0..get_config().limit.usage_reporting_thread_num { | ||
| let (res_sender, res_receiver) = oneshot::channel(); | ||
| if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await { | ||
| log::error!("[USAGE] Error shutting down USAGE_QUEUER: {e}"); | ||
| } | ||
| // wait for flush ingestion job | ||
| res_receiver.await.ok(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential deadlock in shutdown sequence
The current implementation waits for each thread's response sequentially. If any thread fails to respond, it could block the shutdown process indefinitely. Consider using tokio::time::timeout or collecting all responses and waiting with join_all.
- for _ in 0..get_config().limit.usage_reporting_thread_num {
- let (res_sender, res_receiver) = oneshot::channel();
- if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await {
- log::error!("[USAGE] Error shutting down USAGE_QUEUER: {e}");
- }
- // wait for flush ingestion job
- res_receiver.await.ok();
- }
+ let mut shutdown_handles = Vec::new();
+ for _ in 0..get_config().limit.usage_reporting_thread_num {
+ let (res_sender, res_receiver) = oneshot::channel();
+ if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await {
+ log::error!("[USAGE] Error shutting down USAGE_QUEUER: {e}");
+ continue;
+ }
+ shutdown_handles.push(res_receiver);
+ }
+ // Wait for all threads with timeout
+ for handle in shutdown_handles {
+ if let Err(e) = tokio::time::timeout(
+ std::time::Duration::from_secs(5),
+ handle
+ ).await {
+ log::error!("[USAGE] Timeout waiting for thread shutdown: {e}");
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for _ in 0..get_config().limit.usage_reporting_thread_num { | |
| let (res_sender, res_receiver) = oneshot::channel(); | |
| if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await { | |
| log::error!("[USAGE] Error shutting down USAGE_QUEUER: {e}"); | |
| } | |
| // wait for flush ingestion job | |
| res_receiver.await.ok(); | |
| let mut shutdown_handles = Vec::new(); | |
| for _ in 0..get_config().limit.usage_reporting_thread_num { | |
| let (res_sender, res_receiver) = oneshot::channel(); | |
| if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await { | |
| log::error!("[USAGE] Error shutting down USAGE_QUEUER: {e}"); | |
| continue; | |
| } | |
| shutdown_handles.push(res_receiver); | |
| } | |
| // Wait for all threads with timeout | |
| for handle in shutdown_handles { | |
| if let Err(e) = tokio::time::timeout( | |
| std::time::Duration::from_secs(5), | |
| handle | |
| ).await { | |
| log::error!("[USAGE] Timeout waiting for thread shutdown: {e}"); | |
| } | |
| } |
| async fn ingest_buffered_usage(thread_id: usize, usage_buffer: Vec<UsageBuffer>) { | ||
| log::debug!( | ||
| "[USAGE:JOB] thread_{thread_id} ingests {} buffered usage data", | ||
| usage_buffer.len() | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Return Result for better error handling
The function should return a Result to properly propagate errors to the caller, enabling better error handling and potential retry mechanisms.
-async fn ingest_buffered_usage(thread_id: usize, usage_buffer: Vec<UsageBuffer>) {
+async fn ingest_buffered_usage(thread_id: usize, usage_buffer: Vec<UsageBuffer>) -> Result<(), anyhow::Error> {
log::debug!(
"[USAGE:JOB] thread_{thread_id} ingests {} buffered usage data",
usage_buffer.len()
);
let (mut usage_data, mut trigger_data) = (Vec::new(), Vec::new());
for item in usage_buffer {
match item {
UsageBuffer::Usage(usage) => usage_data.push(*usage),
UsageBuffer::Trigger(trigger) => trigger_data.push(*trigger),
}
}
if !usage_data.is_empty() {
- ingest_usages(usage_data).await;
+ ingest_usages(usage_data).await?;
}
if !trigger_data.is_empty() {
- ingest_trigger_usages(trigger_data).await;
+ ingest_trigger_usages(trigger_data).await?;
}
+ Ok(())
}Committable suggestion skipped: line range outside the PR's diff.
impl #4974
envs to configure usage reporting jobs:
ZO_USAGE_REPORTING_THREAD_NUM: number of threads handling the background usage reporting job.default: 0
-> local_mode:
cpu_num / 2-> distributed:
cpu_numZO_USAGE_BATCH_SIZE: num of usage data to be buffered in memory before all buffered is ingested into usage orgdefault: 2,000
applied to each thread
ZO_USAGE_PUBLISH_INTERVAL: max num of seconds between each ingestion, regardless ifZO_USAGE_BATCH_SIZEis met or not.default: 60s
applied to each thread
Summary by CodeRabbit
New Features
Bug Fixes
Documentation