Skip to content

feat(engine): Trust-system Arc 1 wave 2 — persist RunRecord on rocky run#203

Merged
hugocorreia90 merged 2 commits intomainfrom
feat/arc1-wave2-record-run
Apr 21, 2026
Merged

feat(engine): Trust-system Arc 1 wave 2 — persist RunRecord on rocky run#203
hugocorreia90 merged 2 commits intomainfrom
feat/arc1-wave2-record-run

Conversation

@hugocorreia90
Copy link
Copy Markdown
Contributor

Summary

Wires StateStore::record_run into every exit path of rocky run (happy / interrupted / model-only). Until this PR the RUN_HISTORY redb table was defined and queryable but never written to in production — so rocky history, rocky replay, rocky trace, and the new rocky cost all read from an empty store. The POC 00-foundations/06-branches-replay-lineage/run.sh documented the gap explicitly: "no records yet; write path is Arc 1 wave 2". This is that wave.

What changes for the user

Running any pipeline now populates the run history:

$ rocky run
$ rocky history        # 1 run, status Success, trigger Manual
$ rocky replay latest  # SQL hashes, per-model status, timings
$ rocky cost latest    # bytes + duration + computed $ per model

The POC 00-foundations/06-branches-replay-lineage exercises the full flow end-to-end and writes three additional expected fixtures (history.json, replay_latest.json, cost_latest.json). The || true swallow on step 6 is gone.

Engine changes

  • RunOutput::to_run_record(run_id, started, finished, config_hash, trigger, status)rocky_core::state::RunRecord. Materializations → success-status ModelExecutions; errors → failed entries. Per-model timestamps are lossily reconstructed as finished_at = run.finished_at and started_at = finished - duration — honest (every field is true; actual wall-clock windows need execution-loop instrumentation, deferred).
  • RunOutput::derive_run_status() maps interrupted / tables_failed / tables_copiedSuccess / PartialFailure / Failure.
  • config_fingerprint(&Path) — 16-char hex of raw rocky.toml bytes via DefaultHasher (same pattern as sql_fingerprint). Stored on RunRecord.config_hash.
  • persist_run_record helper in run.rs — warn-on-error, never fails the command. Matches the resilience posture of state_sync aborts elsewhere in the file. Called at all 3 exit paths right after populate_cost_summary.

9 new unit tests cover the helpers; 205 rocky-cli + 976 rocky-core tests stay green.

Pre-existing drift exposed (fixed in-line)

rocky history and rocky optimize returned empty arrays before, so Pydantic shape drift in dagster_rocky.types went unnoticed. Now that real data flows:

  • HistoryResult hand-written class mirrored the state-store RunRecord shape (finished_at, config_hash, models_executed as list) — not what rocky history --json actually emits. Completes the Phase 2 "soft swap" for history that was never done: HistoryResult = HistoryOutput, ModelHistoryResult = ModelHistoryOutput. tests/scenarios.py::HISTORY + the parse assertion updated to the CLI shape.
  • OptimizeRecommendation (Rust) projected only a 5-field slim view, but Dagster's checks.py:54-59 reads compute_cost_per_run, storage_cost_per_month, downstream_references from each recommendation. Those fields exist on rocky_core::optimize::MaterializationCost and are now propagated through the CLI projection. checks.py consumers work without change.

Fixture determinism

scripts/_normalize_fixture.py gained WALL_CLOCK_ID_FIELDS = {"run_id"} → sentinelled to "run-SENTINEL" (the ids contain YYYYMMDD-HHMMSS-NNN timestamps). Confirmed byte-stable across two regen invocations. 4 fixtures picked up real data: history.json, anomaly/history.json, optimize.json, optimize/optimize.json.

What's deferred

  • Real per-model start timestamps. Current capture is finish-relative (finished_at = run_finished_at for every model). Honest but lossy under parallel execution. Needs execution-loop instrumentation — separate wave.
  • bytes_scanned / bytes_written per model. MaterializationOutput still doesn't carry these from the adapter layer, so they stay None on the persisted ModelExecution. Same plumbing work already called out on populate_cost_summary.
  • Resume semantics. rocky run --resume-latest mints a new run_id today (see run.rs:843), so resume does not overwrite the prior record — new entry with resumed_from metadata. If you want resume to replace the original record, that's a design decision for a follow-up.

Test plan

  • cargo test -p rocky-cli -p rocky-core — 1240 tests pass
  • cargo clippy --workspace --all-targets -- -D warnings clean
  • cargo fmt --all --check clean
  • just codegen idempotent (schemas + bindings regenerated and committed for the new optimize fields)
  • just regen-fixtures byte-stable across two invocations
  • uv run pytest in integrations/dagster/ — 307 tests pass
  • npm run compile in editors/vscode/ — clean
  • End-to-end: rocky run in playground POC → rocky history, rocky replay latest, rocky cost latest all return real data

Wires `StateStore::record_run` into every exit path of `rocky run`
(happy / interrupted / model-only). Until now the `RUN_HISTORY` redb
table was defined + queryable but never written to in production — so
`rocky history`, `rocky replay`, `rocky trace`, and the new `rocky cost`
all read from an empty store. The POC's `run.sh` even documented the
gap: *"no records yet; write path is Arc 1 wave 2"*. This is that wave.

## What changes for the user

Running any pipeline now populates the run history:

    $ rocky run
    $ rocky history        # 1 run, status Success, trigger Manual
    $ rocky replay latest  # SQL hashes, per-model status, timings
    $ rocky cost latest    # bytes + duration + computed $ per model

The POC `00-foundations/06-branches-replay-lineage` exercises the full
flow end-to-end; `|| true` on step 6 is gone.

## Engine changes

- `RunOutput::to_run_record(run_id, started, finished, config_hash,
  trigger, status)` → `rocky_core::state::RunRecord`. Materializations
  → success-status `ModelExecution`s; `errors` → failed entries.
  Per-model timestamps are lossily reconstructed as
  `finished_at = run.finished_at`, `started_at = finished - duration`
  (honest — every field is true, actual wall-clock windows need
  execution-loop instrumentation, deferred).
- `RunOutput::derive_run_status()` maps `interrupted` / `tables_failed`
  / `tables_copied` → `Success` / `PartialFailure` / `Failure`.
- `config_fingerprint(&Path)` — 16-char hex of the raw rocky.toml
  bytes via `DefaultHasher` (same pattern as `sql_fingerprint`). Stored
  on `RunRecord.config_hash` so consumers can detect "did the config
  change between these two runs?" without diffing full TOML.
- `persist_run_record` helper in `run.rs` — warn-on-error, never fails
  the command. Matches the resilience posture of state_sync aborts
  elsewhere in the file. Called at all 3 exit paths after
  `populate_cost_summary`.

9 new unit tests cover the helpers; 205 rocky-cli + 976 rocky-core
tests stay green.

## Pre-existing drift exposed (fixed in-line)

`rocky history` and `rocky optimize` returned empty arrays before, so
Pydantic shape drift in `dagster_rocky.types` went unnoticed. Now
that real data flows:

- **HistoryResult** hand-written class mirrored the *state-store*
  `RunRecord` shape (`finished_at`, `config_hash`, `models_executed`
  as list) — not what `rocky history --json` actually emits.
  Completes the Phase 2 "soft swap" that was never done for history:
  `HistoryResult = HistoryOutput`, `ModelHistoryResult =
  ModelHistoryOutput`. `tests/scenarios.py::HISTORY` + the parse
  assertion updated to the CLI shape.
- **OptimizeRecommendation** (Rust) projected only a 5-field slim
  view; Dagster `checks.py:54-59` reads `compute_cost_per_run`,
  `storage_cost_per_month`, `downstream_references` from the
  recommendation. Those fields exist on
  `rocky_core::optimize::MaterializationCost` — now propagated through
  the CLI projection. `checks.py` consumers work without change.

## Fixture determinism

`scripts/_normalize_fixture.py` gained `WALL_CLOCK_ID_FIELDS =
{"run_id"}` → sentinelled to `"run-SENTINEL"` (the ids contain
`YYYYMMDD-HHMMSS-NNN` timestamps). Regen is byte-stable across
invocations. 4 fixtures picked up real data: `history.json`,
`anomaly/history.json`, `optimize.json`, `optimize/optimize.json`.

## Test plan

- [x] `cargo test -p rocky-cli -p rocky-core` — 1240 tests pass
- [x] `cargo clippy --workspace --all-targets -- -D warnings` clean
- [x] `cargo fmt --all --check` clean
- [x] `just codegen` idempotent (schemas + bindings regenerated and
      committed for the new optimize fields)
- [x] `just regen-fixtures` byte-stable across two invocations
- [x] `uv run pytest` in `integrations/dagster/` — 307 tests pass
- [x] `npm run compile` in `editors/vscode/` — clean
- [x] End-to-end: `rocky run` in playground POC → `rocky history`,
      `rocky replay latest`, `rocky cost latest` all return real data
CI codegen-drift flagged optimize.json wiggle: `compute_cost_per_run`
varied 3e-06 → 5e-06 across machines. The engine computes it as
`avg_duration_seconds * compute_cost_per_second` before emitting,
so the raw duration field (already zeroed by the normalizer) is
irrelevant — the product reaches the fixture as-is.

Extended `_normalize_fixture.py` with `DERIVED_FROM_WALL_CLOCK_FIELDS`
({'compute_cost_per_run', 'estimated_monthly_savings'}) and a small
int-vs-float preservation branch so 0.0 stays 0.0 (no churn against
previously-zero float fixtures).

Confirmed byte-stable across two consecutive regens on the same
machine; 307 dagster tests still pass.
@hugocorreia90 hugocorreia90 merged commit de7155d into main Apr 21, 2026
16 checks passed
@hugocorreia90 hugocorreia90 deleted the feat/arc1-wave2-record-run branch April 21, 2026 16:02
hugocorreia90 added a commit that referenced this pull request Apr 21, 2026
Adds reference + features-page coverage for the commands that landed
in PRs #199-#203, and populates the CHANGELOG's [Unreleased] section
for the upcoming coordinated release cut.

- `docs/reference/commands/administration.md` — new `rocky cost
  <run_id|latest>` section with JSON + table examples and adapter
  coverage notes (Databricks/Snowflake duration-based; BigQuery bytes;
  DuckDB zero; discovery adapters skipped). Added to the related-
  commands list on `rocky trace` so the three sibling readers
  (replay/trace/cost) cross-link.
- `docs/reference/commands/core-pipeline.md` — added `rocky branch
  compare <name>` to the branch subcommand header and a short usage
  example explaining the `ShadowConfig.schema_override` mechanism it
  shares with `rocky run --branch`.
- `docs/features/all-features.md` — added `cost` to the administration
  command roster and a bullet under Observability for the historical-
  rollup surface.
- `engine/CHANGELOG.md` — populated [Unreleased] with the five PRs
  shipped 2026-04-21 (SIGPIPE, branch compare, POC portability cleanup,
  rocky cost, record_run wiring) plus the Pydantic soft-swap + fixture
  normalizer internals. Ready for the next coordinated release cut.

Docs build: `cd docs && npm run build` — 69 pages built, no warnings.
hugocorreia90 added a commit that referenced this pull request Apr 21, 2026
…ps (#205)

The tape's name has always promised replay; the content never showed
it. Now that PR #203 makes `rocky replay` / `rocky cost` return real
data end-to-end (Arc 1 wave 2's record_run wiring), the demo can
finally deliver on its name.

New step sequence after `rocky run --branch fix_revenue`:

    rocky replay latest --output table   # per-model SQL hashes,
                                         # status, timings for the
                                         # branch's run
    rocky cost latest --output table     # per-model + total cost
                                         # rollup ($0.00 for DuckDB)

Lineage stays as the final flourish. Tape preamble also adds an
explicit `export PATH=...engine/target/release:$PATH` so the demo
uses the local build rather than whatever `rocky` happens to be on
the global PATH — relevant until the next release cut.

Rendered at 953 KB / 1200×700 via `vhs` — within the 5 MB asset
budget. Tape source lives in `~/Developer/rocky-live/assets/` (private
library); this commit updates only the public-facing GIF embedded in
the README and the Astro docs.
hugocorreia90 added a commit that referenced this pull request Apr 21, 2026
#206)

Replaces the lossy reconstruction PR #203 left behind: per-model
started_at was derived as \`finished_at - duration_ms\` (with every
model sharing the run's finished_at), which collapses parallel
executions to a single endpoint and mis-orders overlapping models.

Now MaterializationOutput carries \`started_at: DateTime<Utc>\` as a
required field, captured with \`Utc::now()\` at the moment each model
begins execution:

- process_table — \`table_started_at\` alongside the existing \`table_start\`
  Instant, used in the parallel-dispatch path.
- execute_time_interval_partition — \`partition_started_at\` alongside
  \`partition_start\`, used per partition.
- run_snapshot — \`started_at\` alongside \`start\`, used by the
  transformation pipeline path.

RunOutput::to_run_record now uses each materialization's
\`started_at\` directly and derives the per-model \`finished_at\` as
\`started_at + duration_ms\`. Parallel runs preserve their real
ordering on the persisted RunRecord; \`rocky replay\` / \`rocky trace\`
/ \`rocky history\` / \`rocky cost\` all read correct wall-clock
windows instead of collapsed finish-relative ones.

The existing \`WALL_CLOCK_FIELDS\` set in scripts/_normalize_fixture.py
already includes \`started_at\`, so the new field gets sentinelled to
\`2000-01-01T00:00:00Z\` in regenerated fixtures — byte-stable across
invocations. Confirmed across two consecutive \`just regen-fixtures\`
runs on the same machine.

## Test plan

- [x] \`cargo test -p rocky-cli -p rocky-core\` — 205 + 976 tests pass
- [x] \`cargo clippy --workspace --all-targets -- -D warnings\` clean
- [x] \`cargo fmt --all --check\` clean
- [x] \`just codegen\` regenerated schemas + Pydantic + TypeScript
- [x] \`just regen-fixtures\` byte-stable across two regens
- [x] \`uv run pytest\` in integrations/dagster — 307 tests pass
- [x] \`npm run compile\` in editors/vscode — clean
- [x] End-to-end: \`rocky run\` → \`rocky replay latest\` shows a real
      wall-clock \`started_at\` distinct from the run's \`finished_at\`

## What's deferred

- Real per-model \`finished_at\` — currently derived from
  \`started_at + duration_ms\`. For strict fidelity the runtime could
  stamp Utc::now() at the actual completion site, but that's two
  sources of truth for an identity that already holds. Keeping the
  derivation means a single field drives the invariant.
- \`bytes_scanned\` / \`bytes_written\` plumbing — still \`None\` on
  MaterializationOutput (adapter-layer work). Same follow-up called
  out on \`populate_cost_summary\`.
hugocorreia90 added a commit that referenced this pull request Apr 21, 2026
* chore(engine): release 1.12.0

Arc 1 wave 2 + cleanup cascade. Eight PRs since v1.11.0:

- #199 SIGPIPE handler
- #200 rocky branch compare
- #201 POC target_dialect cleanup
- #202 rocky cost <run_id|latest> (Arc 2 wave 2 first PR)
- #203 rocky run persists RunRecord (Arc 1 wave 2 load-bearing fix)
- #204 docs + CHANGELOG [Unreleased] cascade
- #205 demo-branches-replay.gif refresh
- #206 real per-model started_at on MaterializationOutput

rocky history / replay / trace / cost now return real data end-to-end
for the first time. Full notes in CHANGELOG.

* feat(state): configurable transfer timeout + tracing span + Valkey wrap

- `StateConfig.transfer_timeout_seconds` (default 300s) replaces the hard-
  coded `STATE_TRANSFER_TIMEOUT`. Operators can now tune the wall-clock
  budget in `rocky.toml` for very large state or slow networks without
  recompiling. `StateConfig` gains a manual `Default` impl so
  `StateConfig::default()` yields 300s (not u64's zero).
- `state.upload` / `state.download` tracing spans wrap every transfer
  carrying `backend`, `bucket`, and `size_bytes`. The in-elapse warn event
  inherits those fields automatically, so hung transfers are diagnosable
  from stderr logs alone (which dagster-rocky streams into the Dagster run
  viewer).
- Structured `warn!` on timeout elapse ("state transfer exceeded timeout
  budget") with a `duration_ms` field — replaces silent `Timeout(_)`.
- Valkey read/write paths audited and closed: `redis::Client::get_connection`
  + `redis::cmd(...).query()` are sync and blocked the tokio runtime thread;
  no outer `tokio::time::timeout` could rescue them. Both `upload_to_valkey`
  and `download_from_valkey` now run under `tokio::task::spawn_blocking`
  inside `with_transfer_timeout`, closing the same class of hang the
  object-store paths were already protected against.
- `default_client_options()` in `object_store.rs` honours the standard
  `object_store`-crate env vars `AWS_ALLOW_HTTP` / `AZURE_ALLOW_HTTP` /
  `GOOGLE_STORAGE_ALLOW_HTTP`. Always off in production; the new
  integration test uses it to front-end the S3 SDK with a plain-HTTP
  wiremock server without bypassing the credential chain.
- New `tests/state_sync_timeout_test.rs` integration test: a wiremock S3
  endpoint that holds PutObject for 1h proves `upload_state` returns
  `StateSyncError::Timeout` within the configured 2s budget (+grace).
  A prompt-endpoint negative control guards against regressions.
- CHANGELOG entries added under [1.12.0]. Example config in
  `engine/examples/dagster-integration/rocky.toml` surfaces the new key.

cargo fmt clean; `clippy --workspace --all-targets -- -D warnings` clean;
all 977 rocky-core unit tests + 30 e2e + 20 integration + the 2 new
timeout tests pass.

* chore(codegen): regenerate schemas + pydantic types for StateConfig.transfer_timeout_seconds

* chore(fixtures): regenerate dagster test fixtures for 1.12.0

`just regen-fixtures` — version string bump only (1.11.0 → 1.12.0) across
35 captured fixtures under integrations/dagster/tests/fixtures_generated/.
hugocorreia90 added a commit that referenced this pull request Apr 21, 2026
* chore(dagster): release 1.8.0

Tracks engine 1.12.0 (Arc 1 wave 2 + cleanup cascade).

- #202 CostOutput + PerModelCostHistorical Pydantic bindings
- #203 OptimizeRecommendation gains 3 cost fields; HistoryResult soft-swap
- #203 Fixture normalizer: WALL_CLOCK_ID_FIELDS + derived-numerics
- #206 MaterializationOutput.started_at (real per-model wall-clock)

Full notes in integrations/dagster/CHANGELOG.md.

* fix(resource): eliminate two-readers race in run_streaming via watchdog

The previous _run_rocky_streaming implementation had the stderr
forwarder thread reading proc.stderr via TextIOWrapper iteration while
the main thread called proc.communicate(timeout=). Two concurrent
readers on the same pipe FD violates CPython's documented subprocess
contract — under stderr traffic the TimeoutExpired path intermittently
failed to fire, causing the 2026-04-18 and 2026-04-19 production hangs
(11.5h rocky run on the second incident, triggering a 58-run Dagster
queue backlog).

Rewrite to use dedicated single-reader threads per pipe + an external
watchdog that kills the process group via os.killpg(SIGKILL). The
watchdog enforces the wall-clock timeout independent of pipe-FD
semantics — traffic patterns on stderr no longer affect enforcement.

Changes in src/dagster_rocky/resource.py:
- Popen now passes start_new_session=True on POSIX so rocky gets its
  own process group (lets os.killpg reach any children rocky spawns).
- New _accumulate_stdout helper: sole reader of proc.stdout.
- Existing _forward_stderr_to_context is the sole reader of
  proc.stderr (docstring updated to call this out).
- Watchdog thread driven by threading.Event.wait(timeout_seconds); on
  timeout, calls os.killpg(SIGKILL) (POSIX) or proc.kill() (Windows)
  and sets the event. ProcessLookupError / OSError swallowed.
- Main thread calls proc.wait() with no timeout. finally block:
  fired.set() (dismiss watchdog) -> watchdog.join(1.0) ->
  stderr_reader.join(2.0) -> stdout_reader.join(2.0).
- Watchdog-fire detected via returncode == -signal.SIGKILL on POSIX;
  raises dg.Failure with description "Rocky command timed out after
  Ns (watchdog-killed)" and stderr_tail / duration_ms / pid metadata.
- Structured INFO log line on Popen success (pid, timeout_s, cmd)
  and on process exit (pid, returncode, duration_ms, outcome:
  success | failure | partial-success | timeout-killed). Closes the
  observability gap behind the prior incident post-mortems.
- All prior semantics preserved: allow_partial, partial-success JSON
  detection, version check, _build_cmd.

Audit of run_pipes (A.2): dg.PipesSubprocessClient.run() in Dagster
1.13.1 calls subprocess.Popen with stdout=stderr=None (inherit) by
default, no reader threads, and process.wait() without a timeout. No
two-readers race possible. Separate issue: pipes has no wall-clock
timeout enforcement at all (only SIGINT on Dagster cancel) — out of
scope for this release.

Tests (tests/test_resource.py):
- Update _streaming_popen_mock: proc.stdout now an iterator (was a
  MagicMock); proc.pid set to a realistic int. Required because the
  new stdout accumulator thread iterates proc.stdout.
- Rewrite test_run_streaming_timeout_kills_proc_and_raises: target
  proc.wait() + os.killpg mocks instead of proc.communicate raising
  TimeoutExpired. Exercises the real threading.Event.wait watchdog
  path end-to-end. Asserts os.killpg called once, Failure description
  contains "1s" and "watchdog-killed".
- Add test_run_streaming_hard_kills_hung_binary_with_stderr_chatter:
  real POSIX shell script that hangs forever spamming stderr at 20Hz
  (the exact production pattern). Asserts dg.Failure raised within
  5s of a 2s timeout. Skipped on Windows.
- Add test_run_streaming_timeout_fires_natively_without_daemon_reader:
  negative control. Same hang script, stderr forwarder monkeypatched
  to a no-op. Documents that the watchdog's effectiveness is
  pipe-FD-independent — external SIGKILL bypasses the race entirely.
- All 309 existing tests still pass; ruff check + format clean.

uv.lock picks up the 1.7.0 -> 1.8.0 version bump already committed in
1abb6a4 (release commit for 1.8.0).

* docs(dagster): CHANGELOG 1.8.0 - subprocess watchdog + race fix

Document the two-readers race fix in dagster-rocky 1.8.0:

- ### Fixed entry describes the proc.stderr two-readers race between
  the daemon forwarder and proc.communicate(timeout=), production
  impact (2026-04-18 and 2026-04-19 hangs), and the A.2 audit outcome
  for run_pipes (upstream PipesSubprocessClient is safe — inherits
  stdio by default, no reader threads, process.wait() without timeout).
- ### Added entries describe the three mechanism changes:
  - Watchdog-based timeout enforcement via threading.Event + os.killpg
    (with start_new_session=True for POSIX process-group isolation).
  - Single-reader threads for stdout and stderr restoring conformance
    with CPython's documented subprocess contract.
  - Structured start/end logging with pid, returncode, duration_ms,
    outcome — closes the observability gap behind prior post-mortems.

Entries go under [1.8.0] (not [Unreleased]) because PR #208 is the
release PR; these changes ship with the 1.8.0 publish when it merges.

* test(resource): address code-quality review on run_streaming regression tests

- drop the unused `_HANG_WITH_STDERR_CHATTER_SCRIPT` module-level constant
  (the tests construct the script inline via `_write_hang_fake`)
- replace bare `pass` in `_noop_forwarder`'s except with an explanatory
  comment + `return`, documenting that drain-only helpers swallow the
  same pipe-close / decode errors the real forwarder does
hugocorreia90 added a commit that referenced this pull request Apr 21, 2026
Tracks engine 1.12.0 (Arc 1 wave 2 + cleanup cascade). Regenerated
TypeScript bindings + project schema; no feature changes to the
extension itself.

- #202 CostOutput + PerModelCostHistorical TypeScript
- #203 OptimizeRecommendation cost fields
- #206 MaterializationOutput.started_at

Full notes in editors/vscode/CHANGELOG.md.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant