Skip to content

Commit 6bb82c0

Browse files
authored
fix(core): establish cpu baseline when possible to improve measurement accuracy (#34120)
## Current Behavior New task processes show 0% CPU on their first measurement because no baseline exists. Accurate readings only appear on the second collection cycle (~1s later). ## Expected Behavior New task processes get accurate CPU readings on their first measurement. The collector establishes CPU baselines for newly registered processes ~250ms before collection, giving `sysinfo` enough time to calculate accurate CPU deltas. ## Technical Details: Baselining & Collection Flow The collection loop runs in 4 phases: ``` T=0ms T=750ms T=1000ms T=1750ms T=2000ms | | | | | v v v v v Collect → Sleep(750ms) → Baseline → Sleep(250ms) → Collect → ... ``` 1. **Collect**: Refresh all processes and gather metrics 2. **Post-collection sleep**: Wait until baseline time (interval - 250ms) 3. **Baseline**: Bulk CPU refresh for newly registered PIDs (if any) 4. **Pre-collection sleep**: Wait 250ms for accurate CPU delta calculation
1 parent 0137ea2 commit 6bb82c0

1 file changed

Lines changed: 65 additions & 3 deletions

File tree

packages/nx/src/native/metrics/collector.rs

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::Result;
22
#[cfg(test)]
33
use crossbeam_channel::Receiver;
44
use crossbeam_channel::{Sender, unbounded};
5-
use dashmap::DashMap;
5+
use dashmap::{DashMap, DashSet};
66
use napi::{Env, JsFunction};
77
use napi_derive::napi;
88
use parking_lot::Mutex;
@@ -80,6 +80,8 @@ struct CollectionRunner {
8080
main_cli_pid: Arc<Mutex<Option<i32>>>,
8181
main_cli_subprocess_pids: Arc<DashMap<i32, Option<String>>>,
8282
daemon_pid: Arc<Mutex<Option<i32>>>,
83+
/// PIDs registered since last baseline refresh (lock-free concurrent set)
84+
pids_needing_baseline: Arc<DashSet<i32>>,
8385

8486
// Collection infrastructure
8587
system: Arc<Mutex<System>>,
@@ -104,6 +106,7 @@ impl CollectionRunner {
104106
main_cli_pid: Arc::clone(&collector.main_cli_pid),
105107
main_cli_subprocess_pids: Arc::clone(&collector.main_cli_subprocess_pids),
106108
daemon_pid: Arc::clone(&collector.daemon_pid),
109+
pids_needing_baseline: Arc::clone(&collector.pids_needing_baseline),
107110
system: Arc::clone(&collector.system),
108111
config: collector.config.clone(),
109112
process_metadata_map: Arc::clone(&collector.process_metadata_map),
@@ -112,9 +115,43 @@ impl CollectionRunner {
112115
}
113116
}
114117

118+
/// Establish CPU baselines for all processes when there are processes needing a baseline.
119+
/// Uses bulk refresh to keep all process timing in sync (avoids per-PID refresh bug).
120+
/// Returns true if baseline was actually performed, false if skipped (nothing to baseline).
121+
fn run_baseline_if_needed(&self) -> bool {
122+
if self.pids_needing_baseline.is_empty() {
123+
return false;
124+
}
125+
126+
trace!("New processes need baseline, running bulk CPU refresh");
127+
128+
{
129+
let mut sys = self.system.lock();
130+
sys.refresh_processes_specifics(
131+
sysinfo::ProcessesToUpdate::All,
132+
false, // don't remove dead processes; collection handles cleanup
133+
ProcessRefreshKind::nothing().with_cpu(),
134+
);
135+
} // release system lock before clearing set
136+
137+
// Clear the set: bulk refresh establishes baselines for all processes
138+
self.pids_needing_baseline.clear();
139+
trace!("Baseline refresh complete");
140+
true
141+
}
142+
115143
/// Run the collection loop
116144
fn run(self) {
117145
let interval = Duration::from_millis(self.config.collection_interval_ms);
146+
// sysinfo's MINIMUM_CPU_UPDATE_INTERVAL + 50ms safety buffer
147+
let baseline_offset = sysinfo::MINIMUM_CPU_UPDATE_INTERVAL + Duration::from_millis(50);
148+
let post_collection_sleep = interval - baseline_offset;
149+
150+
// First iteration: baseline if needed, then wait before first collection
151+
if self.should_collect.load(Ordering::Acquire) && self.run_baseline_if_needed() {
152+
// Sleep to allow CPU to be calculated correctly for the baselined processes
153+
self.sleep_with_early_exit(baseline_offset);
154+
}
118155

119156
while self.should_collect.load(Ordering::Acquire) {
120157
// Collect current metrics and send to main collector thread
@@ -123,8 +160,15 @@ impl CollectionRunner {
123160
.map(|result| self.send_metrics(result))
124161
.ok();
125162

126-
// Sleep in small chunks so thread can exit quickly on shutdown
127-
self.sleep_with_early_exit(interval);
163+
// Sleep after collection, before baseline
164+
self.sleep_with_early_exit(post_collection_sleep);
165+
if !self.should_collect.load(Ordering::Acquire) {
166+
break;
167+
}
168+
169+
self.run_baseline_if_needed();
170+
// Sleep until next collection (offset)
171+
self.sleep_with_early_exit(baseline_offset);
128172
}
129173

130174
self.is_collecting.store(false, Ordering::Release);
@@ -690,6 +734,10 @@ impl CollectionRunner {
690734
daemon_pid_to_clear,
691735
} = self.refresh_and_collect_metrics();
692736

737+
// Collection's bulk refresh established CPU baselines for all processes,
738+
// so clear the tracking set to avoid redundant baseline refreshes
739+
self.pids_needing_baseline.clear();
740+
693741
// Now that system lock is released, clear daemon PID if needed
694742
// This avoids holding system lock while acquiring daemon_pid lock
695743
if let Some(_pid) = daemon_pid_to_clear {
@@ -786,6 +834,8 @@ pub struct ProcessMetricsCollector {
786834
main_cli_subprocess_pids: Arc<DashMap<i32, Option<String>>>,
787835
/// Daemon process PID (can be updated when daemon connects)
788836
daemon_pid: Arc<Mutex<Option<i32>>>,
837+
/// PIDs registered since last baseline refresh (lock-free concurrent set)
838+
pids_needing_baseline: Arc<DashSet<i32>>,
789839
/// Cached CPU core count (set once at initialization)
790840
cpu_cores: u32,
791841
/// Cached total memory in bytes (set once at initialization)
@@ -837,6 +887,7 @@ impl ProcessMetricsCollector {
837887
main_cli_pid: Arc::new(Mutex::new(None)),
838888
main_cli_subprocess_pids: Arc::new(DashMap::new()),
839889
daemon_pid: Arc::new(Mutex::new(None)),
890+
pids_needing_baseline: Arc::new(DashSet::new()),
840891
cpu_cores,
841892
total_memory,
842893
system: Arc::new(Mutex::new(sys)),
@@ -1063,6 +1114,8 @@ impl ProcessMetricsCollector {
10631114
pub fn register_main_cli_process(&self, pid: i32) {
10641115
trace!("Registering main CLI process: pid={}", pid);
10651116
*self.main_cli_pid.lock() = Some(pid);
1117+
// Track that this PID needs a baseline for accurate first CPU reading
1118+
self.pids_needing_baseline.insert(pid);
10661119
trace!("Main CLI process registered: pid={}", pid);
10671120
}
10681121

@@ -1074,6 +1127,8 @@ impl ProcessMetricsCollector {
10741127
pid, alias
10751128
);
10761129
self.main_cli_subprocess_pids.insert(pid, alias);
1130+
// Track that this PID needs a baseline for accurate first CPU reading
1131+
self.pids_needing_baseline.insert(pid);
10771132
trace!("Main CLI subprocess registered: pid={}", pid);
10781133
}
10791134

@@ -1082,6 +1137,8 @@ impl ProcessMetricsCollector {
10821137
pub fn register_daemon_process(&self, pid: i32) {
10831138
let mut daemon_pid = self.daemon_pid.lock();
10841139
*daemon_pid = Some(pid);
1140+
// Track that this PID needs a baseline for accurate first CPU reading
1141+
self.pids_needing_baseline.insert(pid);
10851142
}
10861143

10871144
/// Register a process for a specific task
@@ -1094,6 +1151,8 @@ impl ProcessMetricsCollector {
10941151
.or_insert_with(|| IndividualTaskRegistration::new(task_id.clone()))
10951152
.anchor_pids
10961153
.insert(pid);
1154+
// Track that this PID needs a baseline for accurate first CPU reading
1155+
self.pids_needing_baseline.insert(pid);
10971156
trace!("Task process registered: task_id={}, pid={}", task_id, pid);
10981157
}
10991158

@@ -1110,6 +1169,8 @@ impl ProcessMetricsCollector {
11101169
batch_id.clone(),
11111170
BatchRegistration::new(batch_id.clone(), task_ids, pid),
11121171
);
1172+
// Track that this PID needs a baseline for accurate first CPU reading
1173+
self.pids_needing_baseline.insert(pid);
11131174
trace!("Batch registered: batch_id={}, pid={}", batch_id, pid);
11141175
}
11151176

@@ -1163,6 +1224,7 @@ mod tests {
11631224
main_cli_pid: Arc::new(Mutex::new(None)),
11641225
main_cli_subprocess_pids: Arc::new(DashMap::new()),
11651226
daemon_pid: Arc::new(Mutex::new(None)),
1227+
pids_needing_baseline: Arc::new(DashSet::new()),
11661228
system: Arc::new(Mutex::new(System::new())),
11671229
config: CollectorConfig::default(),
11681230
process_metadata_map: Arc::new(DashMap::new()),

0 commit comments

Comments
 (0)