@@ -2,7 +2,7 @@ use anyhow::Result;
22#[ cfg( test) ]
33use crossbeam_channel:: Receiver ;
44use crossbeam_channel:: { Sender , unbounded} ;
5- use dashmap:: DashMap ;
5+ use dashmap:: { DashMap , DashSet } ;
66use napi:: { Env , JsFunction } ;
77use napi_derive:: napi;
88use parking_lot:: Mutex ;
@@ -80,6 +80,8 @@ struct CollectionRunner {
8080 main_cli_pid : Arc < Mutex < Option < i32 > > > ,
8181 main_cli_subprocess_pids : Arc < DashMap < i32 , Option < String > > > ,
8282 daemon_pid : Arc < Mutex < Option < i32 > > > ,
83+ /// PIDs registered since last baseline refresh (lock-free concurrent set)
84+ pids_needing_baseline : Arc < DashSet < i32 > > ,
8385
8486 // Collection infrastructure
8587 system : Arc < Mutex < System > > ,
@@ -104,6 +106,7 @@ impl CollectionRunner {
104106 main_cli_pid : Arc :: clone ( & collector. main_cli_pid ) ,
105107 main_cli_subprocess_pids : Arc :: clone ( & collector. main_cli_subprocess_pids ) ,
106108 daemon_pid : Arc :: clone ( & collector. daemon_pid ) ,
109+ pids_needing_baseline : Arc :: clone ( & collector. pids_needing_baseline ) ,
107110 system : Arc :: clone ( & collector. system ) ,
108111 config : collector. config . clone ( ) ,
109112 process_metadata_map : Arc :: clone ( & collector. process_metadata_map ) ,
@@ -112,9 +115,43 @@ impl CollectionRunner {
112115 }
113116 }
114117
118+ /// Establish CPU baselines for all processes when there are processes needing a baseline.
119+ /// Uses bulk refresh to keep all process timing in sync (avoids per-PID refresh bug).
120+ /// Returns true if baseline was actually performed, false if skipped (nothing to baseline).
121+ fn run_baseline_if_needed ( & self ) -> bool {
122+ if self . pids_needing_baseline . is_empty ( ) {
123+ return false ;
124+ }
125+
126+ trace ! ( "New processes need baseline, running bulk CPU refresh" ) ;
127+
128+ {
129+ let mut sys = self . system . lock ( ) ;
130+ sys. refresh_processes_specifics (
131+ sysinfo:: ProcessesToUpdate :: All ,
132+ false , // don't remove dead processes; collection handles cleanup
133+ ProcessRefreshKind :: nothing ( ) . with_cpu ( ) ,
134+ ) ;
135+ } // release system lock before clearing set
136+
137+ // Clear the set: bulk refresh establishes baselines for all processes
138+ self . pids_needing_baseline . clear ( ) ;
139+ trace ! ( "Baseline refresh complete" ) ;
140+ true
141+ }
142+
115143 /// Run the collection loop
116144 fn run ( self ) {
117145 let interval = Duration :: from_millis ( self . config . collection_interval_ms ) ;
146+ // sysinfo's MINIMUM_CPU_UPDATE_INTERVAL + 50ms safety buffer
147+ let baseline_offset = sysinfo:: MINIMUM_CPU_UPDATE_INTERVAL + Duration :: from_millis ( 50 ) ;
148+ let post_collection_sleep = interval - baseline_offset;
149+
150+ // First iteration: baseline if needed, then wait before first collection
151+ if self . should_collect . load ( Ordering :: Acquire ) && self . run_baseline_if_needed ( ) {
152+ // Sleep to allow CPU to be calculated correctly for the baselined processes
153+ self . sleep_with_early_exit ( baseline_offset) ;
154+ }
118155
119156 while self . should_collect . load ( Ordering :: Acquire ) {
120157 // Collect current metrics and send to main collector thread
@@ -123,8 +160,15 @@ impl CollectionRunner {
123160 . map ( |result| self . send_metrics ( result) )
124161 . ok ( ) ;
125162
126- // Sleep in small chunks so thread can exit quickly on shutdown
127- self . sleep_with_early_exit ( interval) ;
163+ // Sleep after collection, before baseline
164+ self . sleep_with_early_exit ( post_collection_sleep) ;
165+ if !self . should_collect . load ( Ordering :: Acquire ) {
166+ break ;
167+ }
168+
169+ self . run_baseline_if_needed ( ) ;
170+ // Sleep until next collection (offset)
171+ self . sleep_with_early_exit ( baseline_offset) ;
128172 }
129173
130174 self . is_collecting . store ( false , Ordering :: Release ) ;
@@ -690,6 +734,10 @@ impl CollectionRunner {
690734 daemon_pid_to_clear,
691735 } = self . refresh_and_collect_metrics ( ) ;
692736
737+ // Collection's bulk refresh established CPU baselines for all processes,
738+ // so clear the tracking set to avoid redundant baseline refreshes
739+ self . pids_needing_baseline . clear ( ) ;
740+
693741 // Now that system lock is released, clear daemon PID if needed
694742 // This avoids holding system lock while acquiring daemon_pid lock
695743 if let Some ( _pid) = daemon_pid_to_clear {
@@ -786,6 +834,8 @@ pub struct ProcessMetricsCollector {
786834 main_cli_subprocess_pids : Arc < DashMap < i32 , Option < String > > > ,
787835 /// Daemon process PID (can be updated when daemon connects)
788836 daemon_pid : Arc < Mutex < Option < i32 > > > ,
837+ /// PIDs registered since last baseline refresh (lock-free concurrent set)
838+ pids_needing_baseline : Arc < DashSet < i32 > > ,
789839 /// Cached CPU core count (set once at initialization)
790840 cpu_cores : u32 ,
791841 /// Cached total memory in bytes (set once at initialization)
@@ -837,6 +887,7 @@ impl ProcessMetricsCollector {
837887 main_cli_pid : Arc :: new ( Mutex :: new ( None ) ) ,
838888 main_cli_subprocess_pids : Arc :: new ( DashMap :: new ( ) ) ,
839889 daemon_pid : Arc :: new ( Mutex :: new ( None ) ) ,
890+ pids_needing_baseline : Arc :: new ( DashSet :: new ( ) ) ,
840891 cpu_cores,
841892 total_memory,
842893 system : Arc :: new ( Mutex :: new ( sys) ) ,
@@ -1063,6 +1114,8 @@ impl ProcessMetricsCollector {
10631114 pub fn register_main_cli_process ( & self , pid : i32 ) {
10641115 trace ! ( "Registering main CLI process: pid={}" , pid) ;
10651116 * self . main_cli_pid . lock ( ) = Some ( pid) ;
1117+ // Track that this PID needs a baseline for accurate first CPU reading
1118+ self . pids_needing_baseline . insert ( pid) ;
10661119 trace ! ( "Main CLI process registered: pid={}" , pid) ;
10671120 }
10681121
@@ -1074,6 +1127,8 @@ impl ProcessMetricsCollector {
10741127 pid, alias
10751128 ) ;
10761129 self . main_cli_subprocess_pids . insert ( pid, alias) ;
1130+ // Track that this PID needs a baseline for accurate first CPU reading
1131+ self . pids_needing_baseline . insert ( pid) ;
10771132 trace ! ( "Main CLI subprocess registered: pid={}" , pid) ;
10781133 }
10791134
@@ -1082,6 +1137,8 @@ impl ProcessMetricsCollector {
10821137 pub fn register_daemon_process ( & self , pid : i32 ) {
10831138 let mut daemon_pid = self . daemon_pid . lock ( ) ;
10841139 * daemon_pid = Some ( pid) ;
1140+ // Track that this PID needs a baseline for accurate first CPU reading
1141+ self . pids_needing_baseline . insert ( pid) ;
10851142 }
10861143
10871144 /// Register a process for a specific task
@@ -1094,6 +1151,8 @@ impl ProcessMetricsCollector {
10941151 . or_insert_with ( || IndividualTaskRegistration :: new ( task_id. clone ( ) ) )
10951152 . anchor_pids
10961153 . insert ( pid) ;
1154+ // Track that this PID needs a baseline for accurate first CPU reading
1155+ self . pids_needing_baseline . insert ( pid) ;
10971156 trace ! ( "Task process registered: task_id={}, pid={}" , task_id, pid) ;
10981157 }
10991158
@@ -1110,6 +1169,8 @@ impl ProcessMetricsCollector {
11101169 batch_id. clone ( ) ,
11111170 BatchRegistration :: new ( batch_id. clone ( ) , task_ids, pid) ,
11121171 ) ;
1172+ // Track that this PID needs a baseline for accurate first CPU reading
1173+ self . pids_needing_baseline . insert ( pid) ;
11131174 trace ! ( "Batch registered: batch_id={}, pid={}" , batch_id, pid) ;
11141175 }
11151176
@@ -1163,6 +1224,7 @@ mod tests {
11631224 main_cli_pid : Arc :: new ( Mutex :: new ( None ) ) ,
11641225 main_cli_subprocess_pids : Arc :: new ( DashMap :: new ( ) ) ,
11651226 daemon_pid : Arc :: new ( Mutex :: new ( None ) ) ,
1227+ pids_needing_baseline : Arc :: new ( DashSet :: new ( ) ) ,
11661228 system : Arc :: new ( Mutex :: new ( System :: new ( ) ) ) ,
11671229 config : CollectorConfig :: default ( ) ,
11681230 process_metadata_map : Arc :: new ( DashMap :: new ( ) ) ,
0 commit comments