@@ -68,8 +68,8 @@ pub struct AshSnapshot {
6868#[ derive( Debug , Clone ) ]
6969pub struct PgAshInfo {
7070 pub installed : bool ,
71- /// Retention window in seconds from `ash.config`. Reserved for history
72- /// mode (Layer 2); unused in the current live-only implementation .
71+ /// Retention window in seconds from `ash.config`. Used by history mode
72+ /// to determine how far back to query `ash.sample` .
7373 #[ allow( dead_code) ]
7474 pub retention_seconds : Option < i64 > ,
7575}
@@ -168,17 +168,11 @@ pub async fn live_snapshot(client: &Client) -> anyhow::Result<AshSnapshot> {
168168 Ok ( snap)
169169}
170170
171- /// Return historical snapshots from `pg_ash` if installed .
171+ /// Return historical snapshots from `pg_ash` for an explicit time range .
172172///
173- /// # Stub — history mode (`pg_ash` Layer 2) not yet implemented
174- ///
175- /// TODO: history mode (`pg_ash` Layer 2) — not yet implemented.
176- /// `pg_ash` v1.2 encodes `ash.samples.data` as an opaque `int[]` whose
177- /// layout is not yet publicly documented. Until the encoding is specified
178- /// and history mode is fully wired into the event loop, this function always
179- /// returns an empty vec. The caller in `mod.rs` falls back to the live ring
180- /// buffer transparently, so the TUI never goes blank.
181- /// Track upstream: <https://github.com/NikolayS/rpg/issues/753>
173+ /// Uses `ash.wait_timeline()` to fetch time-bucketed wait event data, then
174+ /// converts each bucket into an `AshSnapshot`. Falls back to an empty vec
175+ /// when `pg_ash` is not installed or the query fails.
182176pub async fn history_snapshots (
183177 client : & Client ,
184178 from : SystemTime ,
@@ -189,12 +183,110 @@ pub async fn history_snapshots(
189183 return Ok ( vec ! [ ] ) ;
190184 }
191185
192- // Validate range (suppress unused-variable warnings until encoding is done).
193- let _ = ( from, to) ;
186+ let from_epoch = from
187+ . duration_since ( UNIX_EPOCH )
188+ . map_or ( 0 , |d| i64:: try_from ( d. as_secs ( ) ) . unwrap_or ( i64:: MAX ) ) ;
189+ let to_epoch = to
190+ . duration_since ( UNIX_EPOCH )
191+ . map_or ( 0 , |d| i64:: try_from ( d. as_secs ( ) ) . unwrap_or ( i64:: MAX ) ) ;
192+
193+ let window_secs = to_epoch. saturating_sub ( from_epoch) . max ( 1 ) ;
194+ let interval = format ! ( "{window_secs} seconds" ) ;
195+
196+ query_ash_history_interval ( client, & interval) . await
197+ }
198+
199+ /// Pre-populate the ring buffer with historical snapshots from `pg_ash`.
200+ ///
201+ /// Queries `ash.wait_timeline()` with 1-second buckets for the requested
202+ /// window, groups each bucket into an `AshSnapshot`, and returns them in
203+ /// chronological order (oldest first).
204+ ///
205+ /// Returns an empty vec when:
206+ /// - `pg_ash` is not installed (graceful degradation)
207+ /// - the query fails (transient error, permission issue, etc.)
208+ /// - no historical data exists for the requested window
209+ pub async fn query_ash_history ( client : & Client , window_secs : u64 ) -> Vec < AshSnapshot > {
210+ let interval = format ! ( "{window_secs} seconds" ) ;
211+ query_ash_history_interval ( client, & interval)
212+ . await
213+ . unwrap_or_default ( )
214+ }
215+
216+ /// Shared implementation for history queries.
217+ ///
218+ /// Uses `ash.wait_timeline(interval, '1 second')` which returns
219+ /// `(bucket_start timestamptz, wait_event text, samples bigint)`
220+ /// already decoded from the opaque `int[]` encoding.
221+ async fn query_ash_history_interval (
222+ client : & Client ,
223+ interval : & str ,
224+ ) -> anyhow:: Result < Vec < AshSnapshot > > {
225+ // ash.wait_timeline returns (bucket_start, wait_event, samples).
226+ // wait_event format: "Type:Event" or just "Type" when type == event
227+ // (e.g. "CPU*", "IO:DataFileRead", "Lock:relation").
228+ let sql = format ! (
229+ "select \
230+ extract(epoch from bucket_start)::int8 as ts, \
231+ wait_event, \
232+ samples::int4 as cnt \
233+ from ash.wait_timeline('{interval}'::interval, '1 second'::interval) \
234+ order by bucket_start, wait_event"
235+ ) ;
236+
237+ let Ok ( rows) = client. query ( & sql, & [ ] ) . await else {
238+ return Ok ( vec ! [ ] ) ;
239+ } ;
240+
241+ if rows. is_empty ( ) {
242+ return Ok ( vec ! [ ] ) ;
243+ }
244+
245+ // Group rows by timestamp into AshSnapshot instances.
246+ let mut snapshots: Vec < AshSnapshot > = Vec :: new ( ) ;
247+ let mut current_ts: i64 = i64:: MIN ;
248+ let mut snap = AshSnapshot :: default ( ) ;
249+
250+ for row in & rows {
251+ let ts: i64 = row. get ( 0 ) ;
252+ let wait_event: String = row. get ( 1 ) ;
253+ let cnt: i32 = row. get ( 2 ) ;
254+ let count = u32:: try_from ( cnt. max ( 0 ) ) . unwrap_or ( 0 ) ;
255+
256+ if ts != current_ts {
257+ if current_ts != i64:: MIN {
258+ snapshots. push ( snap) ;
259+ }
260+ snap = AshSnapshot {
261+ ts,
262+ ..Default :: default ( )
263+ } ;
264+ current_ts = ts;
265+ }
194266
195- // TODO: history mode (pg_ash Layer 2) — decode ash.samples.data int[] encoding
196- // once the format is documented and history mode is wired into the event loop.
197- Ok ( vec ! [ ] )
267+ // Parse "Type:Event" format back into (wtype, wevent).
268+ let ( wtype, wevent) = if let Some ( idx) = wait_event. find ( ':' ) {
269+ (
270+ wait_event[ ..idx] . to_owned ( ) ,
271+ wait_event[ idx + 1 ..] . to_owned ( ) ,
272+ )
273+ } else {
274+ // No colon — type == event (e.g. "CPU*").
275+ ( wait_event. clone ( ) , String :: new ( ) )
276+ } ;
277+
278+ // wait_timeline only gives us type+event, no query_id or query text.
279+ // Use an empty query label; drill-down to query level won't have data
280+ // from history but that's acceptable for timeline pre-population.
281+ fold_row ( & mut snap, & wtype, & wevent, None , "" , count) ;
282+ }
283+
284+ // Push the last accumulated snapshot.
285+ if current_ts != i64:: MIN {
286+ snapshots. push ( snap) ;
287+ }
288+
289+ Ok ( snapshots)
198290}
199291
200292// ---------------------------------------------------------------------------
@@ -315,4 +407,146 @@ mod tests {
315407 let snap = mock_snapshot ( & [ ( "CPU*" , "" , None , "" , 2 ) ] ) ;
316408 assert ! ( snap. by_query. contains_key( "CPU*//(unknown)" ) ) ;
317409 }
410+
411+ // --- pg_ash history integration tests ---
412+
413+ /// Parse a `"Type:Event"` string like `ash.wait_timeline` returns.
414+ fn parse_wait_event ( s : & str ) -> ( String , String ) {
415+ if let Some ( idx) = s. find ( ':' ) {
416+ ( s[ ..idx] . to_owned ( ) , s[ idx + 1 ..] . to_owned ( ) )
417+ } else {
418+ ( s. to_owned ( ) , String :: new ( ) )
419+ }
420+ }
421+
422+ /// Simulate building `AshSnapshot`s from `wait_timeline` rows
423+ /// (the same logic as `query_ash_history_interval`).
424+ fn build_snapshots_from_timeline ( rows : & [ ( i64 , & str , u32 ) ] ) -> Vec < AshSnapshot > {
425+ let mut snapshots: Vec < AshSnapshot > = Vec :: new ( ) ;
426+ let mut current_ts: i64 = i64:: MIN ;
427+ let mut snap = AshSnapshot :: default ( ) ;
428+
429+ for & ( ts, wait_event, count) in rows {
430+ if ts != current_ts {
431+ if current_ts != i64:: MIN {
432+ snapshots. push ( snap) ;
433+ }
434+ snap = AshSnapshot {
435+ ts,
436+ ..Default :: default ( )
437+ } ;
438+ current_ts = ts;
439+ }
440+
441+ let ( wtype, wevent) = parse_wait_event ( wait_event) ;
442+ fold_row ( & mut snap, & wtype, & wevent, None , "" , count) ;
443+ }
444+
445+ if current_ts != i64:: MIN {
446+ snapshots. push ( snap) ;
447+ }
448+
449+ snapshots
450+ }
451+
452+ #[ test]
453+ fn test_history_build_snapshots_basic ( ) {
454+ let rows = vec ! [
455+ ( 1000 , "CPU*" , 5 ) ,
456+ ( 1000 , "IO:DataFileRead" , 3 ) ,
457+ ( 1001 , "CPU*" , 4 ) ,
458+ ( 1001 , "Lock:relation" , 2 ) ,
459+ ( 1002 , "IO:WALWrite" , 1 ) ,
460+ ] ;
461+ let snaps = build_snapshots_from_timeline ( & rows) ;
462+
463+ assert_eq ! ( snaps. len( ) , 3 ) ;
464+
465+ // First snapshot: ts=1000, CPU*=5, IO=3
466+ assert_eq ! ( snaps[ 0 ] . ts, 1000 ) ;
467+ assert_eq ! ( snaps[ 0 ] . active_count, 8 ) ;
468+ assert_eq ! ( snaps[ 0 ] . by_type[ "CPU*" ] , 5 ) ;
469+ assert_eq ! ( snaps[ 0 ] . by_type[ "IO" ] , 3 ) ;
470+
471+ // Second snapshot: ts=1001, CPU*=4, Lock=2
472+ assert_eq ! ( snaps[ 1 ] . ts, 1001 ) ;
473+ assert_eq ! ( snaps[ 1 ] . active_count, 6 ) ;
474+ assert_eq ! ( snaps[ 1 ] . by_type[ "CPU*" ] , 4 ) ;
475+ assert_eq ! ( snaps[ 1 ] . by_type[ "Lock" ] , 2 ) ;
476+ assert_eq ! ( snaps[ 1 ] . by_event[ "Lock/relation" ] , 2 ) ;
477+
478+ // Third snapshot: ts=1002, IO=1
479+ assert_eq ! ( snaps[ 2 ] . ts, 1002 ) ;
480+ assert_eq ! ( snaps[ 2 ] . active_count, 1 ) ;
481+ assert_eq ! ( snaps[ 2 ] . by_type[ "IO" ] , 1 ) ;
482+ }
483+
484+ #[ test]
485+ fn test_history_build_snapshots_empty ( ) {
486+ let snaps = build_snapshots_from_timeline ( & [ ] ) ;
487+ assert ! ( snaps. is_empty( ) ) ;
488+ }
489+
490+ #[ test]
491+ fn test_history_snapshots_prepopulate_ring_buffer ( ) {
492+ use std:: collections:: VecDeque ;
493+
494+ let rows = vec ! [
495+ ( 100 , "CPU*" , 3 ) ,
496+ ( 101 , "IO:DataFileRead" , 2 ) ,
497+ ( 102 , "Lock:relation" , 1 ) ,
498+ ] ;
499+ let history = build_snapshots_from_timeline ( & rows) ;
500+
501+ // Simulate ring buffer pre-population (same logic as mod.rs).
502+ let mut ring: VecDeque < AshSnapshot > = VecDeque :: with_capacity ( 600 ) ;
503+ for snap in history {
504+ if ring. len ( ) == 600 {
505+ ring. pop_front ( ) ;
506+ }
507+ ring. push_back ( snap) ;
508+ }
509+
510+ assert_eq ! ( ring. len( ) , 3 ) ;
511+ assert_eq ! ( ring[ 0 ] . ts, 100 ) ;
512+ assert_eq ! ( ring[ 1 ] . ts, 101 ) ;
513+ assert_eq ! ( ring[ 2 ] . ts, 102 ) ;
514+ }
515+
516+ #[ test]
517+ fn test_history_parse_wait_event_with_colon ( ) {
518+ let ( wtype, wevent) = parse_wait_event ( "IO:DataFileRead" ) ;
519+ assert_eq ! ( wtype, "IO" ) ;
520+ assert_eq ! ( wevent, "DataFileRead" ) ;
521+ }
522+
523+ #[ test]
524+ fn test_history_parse_wait_event_no_colon ( ) {
525+ let ( wtype, wevent) = parse_wait_event ( "CPU*" ) ;
526+ assert_eq ! ( wtype, "CPU*" ) ;
527+ assert_eq ! ( wevent, "" ) ;
528+ }
529+
530+ #[ test]
531+ fn test_history_ring_buffer_capacity_limit ( ) {
532+ use std:: collections:: VecDeque ;
533+
534+ // Build 605 snapshots — ring buffer should keep only last 600.
535+ let rows: Vec < ( i64 , & str , u32 ) > = ( 0 ..605 ) . map ( |i| ( i64:: from ( i) , "CPU*" , 1 ) ) . collect ( ) ;
536+ let history = build_snapshots_from_timeline ( & rows) ;
537+
538+ let mut ring: VecDeque < AshSnapshot > = VecDeque :: with_capacity ( 600 ) ;
539+ for snap in history {
540+ if ring. len ( ) == 600 {
541+ ring. pop_front ( ) ;
542+ }
543+ ring. push_back ( snap) ;
544+ }
545+
546+ assert_eq ! ( ring. len( ) , 600 ) ;
547+ // Oldest kept snapshot should be ts=5 (first 5 were dropped).
548+ assert_eq ! ( ring[ 0 ] . ts, 5 ) ;
549+ // Newest should be ts=604.
550+ assert_eq ! ( ring[ 599 ] . ts, 604 ) ;
551+ }
318552}
0 commit comments