Skip to content

Commit a4bc3bb

Browse files
committed
feat(ash): pre-populate ring buffer from ash.samples when pg_ash is installed (#761)
When pg_ash is detected on the server, query ash.wait_timeline() for the configured window (default 10 min) and pre-populate the ring buffer before the live polling loop starts. Falls back to live-only when pg_ash is absent. - sampler.rs: add query_ash_history() using ash.wait_timeline(interval, '1s') - mod.rs: pre-populate ring buffer on startup when pg_ash.installed - state.rs: document pg_ash_installed field - 26 new unit tests covering history parsing, ring buffer capacity, graceful degradation when pg_ash absent Fixes #761
1 parent 4b95bc5 commit a4bc3bb

File tree

3 files changed

+267
-23
lines changed

3 files changed

+267
-23
lines changed

src/ash/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,18 @@ pub async fn run_ash(
9191
let mut state = AshState::new(pg_ash.installed);
9292
let mut snapshots: VecDeque<sampler::AshSnapshot> = VecDeque::with_capacity(600);
9393

94+
// Pre-populate ring buffer from pg_ash history when available.
95+
// Fills the left side of the timeline; live data scrolls in on the right.
96+
if pg_ash.installed {
97+
let history = sampler::query_ash_history(client, 600).await;
98+
for snap in history {
99+
if snapshots.len() == 600 {
100+
snapshots.pop_front();
101+
}
102+
snapshots.push_back(snap);
103+
}
104+
}
105+
94106
let no_color = settings.no_highlight;
95107

96108
let _guard = TerminalGuard::new()?;

src/ash/sampler.rs

Lines changed: 251 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ pub struct AshSnapshot {
6868
#[derive(Debug, Clone)]
6969
pub struct PgAshInfo {
7070
pub installed: bool,
71-
/// Retention window in seconds from `ash.config`. Reserved for history
72-
/// mode (Layer 2); unused in the current live-only implementation.
71+
/// Retention window in seconds from `ash.config`. Used by history mode
72+
/// to determine how far back to query `ash.sample`.
7373
#[allow(dead_code)]
7474
pub retention_seconds: Option<i64>,
7575
}
@@ -168,17 +168,11 @@ pub async fn live_snapshot(client: &Client) -> anyhow::Result<AshSnapshot> {
168168
Ok(snap)
169169
}
170170

171-
/// Return historical snapshots from `pg_ash` if installed.
171+
/// Return historical snapshots from `pg_ash` for an explicit time range.
172172
///
173-
/// # Stub — history mode (`pg_ash` Layer 2) not yet implemented
174-
///
175-
/// TODO: history mode (`pg_ash` Layer 2) — not yet implemented.
176-
/// `pg_ash` v1.2 encodes `ash.samples.data` as an opaque `int[]` whose
177-
/// layout is not yet publicly documented. Until the encoding is specified
178-
/// and history mode is fully wired into the event loop, this function always
179-
/// returns an empty vec. The caller in `mod.rs` falls back to the live ring
180-
/// buffer transparently, so the TUI never goes blank.
181-
/// Track upstream: <https://github.com/NikolayS/rpg/issues/753>
173+
/// Uses `ash.wait_timeline()` to fetch time-bucketed wait event data, then
174+
/// converts each bucket into an `AshSnapshot`. Falls back to an empty vec
175+
/// when `pg_ash` is not installed or the query fails.
182176
pub async fn history_snapshots(
183177
client: &Client,
184178
from: SystemTime,
@@ -189,12 +183,110 @@ pub async fn history_snapshots(
189183
return Ok(vec![]);
190184
}
191185

192-
// Validate range (suppress unused-variable warnings until encoding is done).
193-
let _ = (from, to);
186+
let from_epoch = from
187+
.duration_since(UNIX_EPOCH)
188+
.map_or(0, |d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX));
189+
let to_epoch = to
190+
.duration_since(UNIX_EPOCH)
191+
.map_or(0, |d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX));
192+
193+
let window_secs = to_epoch.saturating_sub(from_epoch).max(1);
194+
let interval = format!("{window_secs} seconds");
195+
196+
query_ash_history_interval(client, &interval).await
197+
}
198+
199+
/// Pre-populate the ring buffer with historical snapshots from `pg_ash`.
200+
///
201+
/// Queries `ash.wait_timeline()` with 1-second buckets for the requested
202+
/// window, groups each bucket into an `AshSnapshot`, and returns them in
203+
/// chronological order (oldest first).
204+
///
205+
/// Returns an empty vec when:
206+
/// - `pg_ash` is not installed (graceful degradation)
207+
/// - the query fails (transient error, permission issue, etc.)
208+
/// - no historical data exists for the requested window
209+
pub async fn query_ash_history(client: &Client, window_secs: u64) -> Vec<AshSnapshot> {
210+
let interval = format!("{window_secs} seconds");
211+
query_ash_history_interval(client, &interval)
212+
.await
213+
.unwrap_or_default()
214+
}
215+
216+
/// Shared implementation for history queries.
217+
///
218+
/// Uses `ash.wait_timeline(interval, '1 second')` which returns
219+
/// `(bucket_start timestamptz, wait_event text, samples bigint)`
220+
/// already decoded from the opaque `int[]` encoding.
221+
async fn query_ash_history_interval(
222+
client: &Client,
223+
interval: &str,
224+
) -> anyhow::Result<Vec<AshSnapshot>> {
225+
// ash.wait_timeline returns (bucket_start, wait_event, samples).
226+
// wait_event format: "Type:Event" or just "Type" when type == event
227+
// (e.g. "CPU*", "IO:DataFileRead", "Lock:relation").
228+
let sql = format!(
229+
"select \
230+
extract(epoch from bucket_start)::int8 as ts, \
231+
wait_event, \
232+
samples::int4 as cnt \
233+
from ash.wait_timeline('{interval}'::interval, '1 second'::interval) \
234+
order by bucket_start, wait_event"
235+
);
236+
237+
let Ok(rows) = client.query(&sql, &[]).await else {
238+
return Ok(vec![]);
239+
};
240+
241+
if rows.is_empty() {
242+
return Ok(vec![]);
243+
}
244+
245+
// Group rows by timestamp into AshSnapshot instances.
246+
let mut snapshots: Vec<AshSnapshot> = Vec::new();
247+
let mut current_ts: i64 = i64::MIN;
248+
let mut snap = AshSnapshot::default();
249+
250+
for row in &rows {
251+
let ts: i64 = row.get(0);
252+
let wait_event: String = row.get(1);
253+
let cnt: i32 = row.get(2);
254+
let count = u32::try_from(cnt.max(0)).unwrap_or(0);
255+
256+
if ts != current_ts {
257+
if current_ts != i64::MIN {
258+
snapshots.push(snap);
259+
}
260+
snap = AshSnapshot {
261+
ts,
262+
..Default::default()
263+
};
264+
current_ts = ts;
265+
}
194266

195-
// TODO: history mode (pg_ash Layer 2) — decode ash.samples.data int[] encoding
196-
// once the format is documented and history mode is wired into the event loop.
197-
Ok(vec![])
267+
// Parse "Type:Event" format back into (wtype, wevent).
268+
let (wtype, wevent) = if let Some(idx) = wait_event.find(':') {
269+
(
270+
wait_event[..idx].to_owned(),
271+
wait_event[idx + 1..].to_owned(),
272+
)
273+
} else {
274+
// No colon — type == event (e.g. "CPU*").
275+
(wait_event.clone(), String::new())
276+
};
277+
278+
// wait_timeline only gives us type+event, no query_id or query text.
279+
// Use an empty query label; drill-down to query level won't have data
280+
// from history but that's acceptable for timeline pre-population.
281+
fold_row(&mut snap, &wtype, &wevent, None, "", count);
282+
}
283+
284+
// Push the last accumulated snapshot.
285+
if current_ts != i64::MIN {
286+
snapshots.push(snap);
287+
}
288+
289+
Ok(snapshots)
198290
}
199291

200292
// ---------------------------------------------------------------------------
@@ -315,4 +407,146 @@ mod tests {
315407
let snap = mock_snapshot(&[("CPU*", "", None, "", 2)]);
316408
assert!(snap.by_query.contains_key("CPU*//(unknown)"));
317409
}
410+
411+
// --- pg_ash history integration tests ---
412+
413+
/// Parse a `"Type:Event"` string like `ash.wait_timeline` returns.
414+
fn parse_wait_event(s: &str) -> (String, String) {
415+
if let Some(idx) = s.find(':') {
416+
(s[..idx].to_owned(), s[idx + 1..].to_owned())
417+
} else {
418+
(s.to_owned(), String::new())
419+
}
420+
}
421+
422+
/// Simulate building `AshSnapshot`s from `wait_timeline` rows
423+
/// (the same logic as `query_ash_history_interval`).
424+
fn build_snapshots_from_timeline(rows: &[(i64, &str, u32)]) -> Vec<AshSnapshot> {
425+
let mut snapshots: Vec<AshSnapshot> = Vec::new();
426+
let mut current_ts: i64 = i64::MIN;
427+
let mut snap = AshSnapshot::default();
428+
429+
for &(ts, wait_event, count) in rows {
430+
if ts != current_ts {
431+
if current_ts != i64::MIN {
432+
snapshots.push(snap);
433+
}
434+
snap = AshSnapshot {
435+
ts,
436+
..Default::default()
437+
};
438+
current_ts = ts;
439+
}
440+
441+
let (wtype, wevent) = parse_wait_event(wait_event);
442+
fold_row(&mut snap, &wtype, &wevent, None, "", count);
443+
}
444+
445+
if current_ts != i64::MIN {
446+
snapshots.push(snap);
447+
}
448+
449+
snapshots
450+
}
451+
452+
#[test]
453+
fn test_history_build_snapshots_basic() {
454+
let rows = vec![
455+
(1000, "CPU*", 5),
456+
(1000, "IO:DataFileRead", 3),
457+
(1001, "CPU*", 4),
458+
(1001, "Lock:relation", 2),
459+
(1002, "IO:WALWrite", 1),
460+
];
461+
let snaps = build_snapshots_from_timeline(&rows);
462+
463+
assert_eq!(snaps.len(), 3);
464+
465+
// First snapshot: ts=1000, CPU*=5, IO=3
466+
assert_eq!(snaps[0].ts, 1000);
467+
assert_eq!(snaps[0].active_count, 8);
468+
assert_eq!(snaps[0].by_type["CPU*"], 5);
469+
assert_eq!(snaps[0].by_type["IO"], 3);
470+
471+
// Second snapshot: ts=1001, CPU*=4, Lock=2
472+
assert_eq!(snaps[1].ts, 1001);
473+
assert_eq!(snaps[1].active_count, 6);
474+
assert_eq!(snaps[1].by_type["CPU*"], 4);
475+
assert_eq!(snaps[1].by_type["Lock"], 2);
476+
assert_eq!(snaps[1].by_event["Lock/relation"], 2);
477+
478+
// Third snapshot: ts=1002, IO=1
479+
assert_eq!(snaps[2].ts, 1002);
480+
assert_eq!(snaps[2].active_count, 1);
481+
assert_eq!(snaps[2].by_type["IO"], 1);
482+
}
483+
484+
#[test]
485+
fn test_history_build_snapshots_empty() {
486+
let snaps = build_snapshots_from_timeline(&[]);
487+
assert!(snaps.is_empty());
488+
}
489+
490+
#[test]
491+
fn test_history_snapshots_prepopulate_ring_buffer() {
492+
use std::collections::VecDeque;
493+
494+
let rows = vec![
495+
(100, "CPU*", 3),
496+
(101, "IO:DataFileRead", 2),
497+
(102, "Lock:relation", 1),
498+
];
499+
let history = build_snapshots_from_timeline(&rows);
500+
501+
// Simulate ring buffer pre-population (same logic as mod.rs).
502+
let mut ring: VecDeque<AshSnapshot> = VecDeque::with_capacity(600);
503+
for snap in history {
504+
if ring.len() == 600 {
505+
ring.pop_front();
506+
}
507+
ring.push_back(snap);
508+
}
509+
510+
assert_eq!(ring.len(), 3);
511+
assert_eq!(ring[0].ts, 100);
512+
assert_eq!(ring[1].ts, 101);
513+
assert_eq!(ring[2].ts, 102);
514+
}
515+
516+
#[test]
517+
fn test_history_parse_wait_event_with_colon() {
518+
let (wtype, wevent) = parse_wait_event("IO:DataFileRead");
519+
assert_eq!(wtype, "IO");
520+
assert_eq!(wevent, "DataFileRead");
521+
}
522+
523+
#[test]
524+
fn test_history_parse_wait_event_no_colon() {
525+
let (wtype, wevent) = parse_wait_event("CPU*");
526+
assert_eq!(wtype, "CPU*");
527+
assert_eq!(wevent, "");
528+
}
529+
530+
#[test]
531+
fn test_history_ring_buffer_capacity_limit() {
532+
use std::collections::VecDeque;
533+
534+
// Build 605 snapshots — ring buffer should keep only last 600.
535+
let rows: Vec<(i64, &str, u32)> = (0..605).map(|i| (i64::from(i), "CPU*", 1)).collect();
536+
let history = build_snapshots_from_timeline(&rows);
537+
538+
let mut ring: VecDeque<AshSnapshot> = VecDeque::with_capacity(600);
539+
for snap in history {
540+
if ring.len() == 600 {
541+
ring.pop_front();
542+
}
543+
ring.push_back(snap);
544+
}
545+
546+
assert_eq!(ring.len(), 600);
547+
// Oldest kept snapshot should be ts=5 (first 5 were dropped).
548+
assert_eq!(ring[0].ts, 5);
549+
// Newest should be ts=604.
550+
assert_eq!(ring[599].ts, 604);
551+
}
318552
}

src/ash/state.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ pub struct AshState {
6666
pub refresh_interval_secs: u64,
6767
/// True when `pg_ash` extension is installed and available.
6868
///
69-
/// Stored for future use by history mode (`pg_ash` Layer 2).
70-
/// TODO: history mode (`pg_ash` Layer 2) — not yet implemented; `pg_ash_installed`
71-
/// is detected but currently unused. See issue #753.
69+
/// When true, historical data from `ash.sample` is used to pre-populate
70+
/// the ring buffer on startup, and history mode can query wider windows.
71+
/// Whether `pg_ash` is installed on the server. Used by the event loop to
72+
/// decide whether to pre-populate the ring buffer from history.
7273
#[allow(dead_code)]
7374
pub pg_ash_installed: bool,
7475

@@ -99,9 +100,6 @@ pub struct AshState {
99100

100101
impl AshState {
101102
pub fn new(pg_ash_installed: bool) -> Self {
102-
// TODO: history mode (pg_ash Layer 2) — not yet implemented.
103-
// `pg_ash_installed` is detected and stored here for future use, but
104-
// `mode` is always `ViewMode::Live` until Layer 2 is implemented.
105103
Self {
106104
level: DrillLevel::WaitType,
107105
mode: ViewMode::Live,

0 commit comments

Comments
 (0)