Skip to content

feat(engine): Trust-system Arc 2 wave 2 — rocky cost <run_id|latest>#202

Merged
hugocorreia90 merged 1 commit intomainfrom
feat/arc2-rocky-cost
Apr 21, 2026
Merged

feat(engine): Trust-system Arc 2 wave 2 — rocky cost <run_id|latest>#202
hugocorreia90 merged 1 commit intomainfrom
feat/arc2-rocky-cost

Conversation

@hugocorreia90
Copy link
Copy Markdown
Contributor

Summary

Trust-system Arc 2 wave 2 — first PR. Adds a new rocky cost <run_id|latest> CLI verb that reads per-run cost attribution from the embedded state store and rolls it up.

Arc 2 first wave (PR #171, engine v1.11.0) shipped per-run cost on RunOutput.cost_summary + declarative [budget] blocks. This PR adds the historical query surface over stored runs — "what did my last run cost?" and "what has this model cost over the last N runs?" answerable from the recorded run alone, without re-materialising.

The command re-derives per-model cost via rocky_core::cost::compute_observed_cost_usd — the same formula RunOutput::populate_cost_summary applies at the end of a live run — so the historical surface stays consistent with the first-wave per-run summary without sharing storage.

BigQuery quiet upgrade: because ModelExecution.bytes_scanned is persisted in the state store, this command returns a real cost figure for BQ runs even though the live rocky run path still reports cost_usd: null for BQ (adapter bytes-scanned plumbing is a later wave).

Plan reference: ~/Developer/rocky-plans/plans/rocky-trust-system-direction.md §"Arc 2".

In scope (this PR)

  • New rocky cost <run_id|latest> [--model <name>] [--output json|table] subcommand.
  • Reads RunRecord from the state store (redb) and uses per-model bytes_scanned, bytes_written, duration_ms from ModelExecution.
  • New CostOutput + PerModelCostHistorical output types deriving JsonSchema, registered in export_schemas.rs::schemas().
  • JSON + table output formats; --model filter mirrors rocky replay.
  • Adapter type resolved from rocky.toml: picks adapter.default when present, falls back to first-declared; unbilled-adapter (e.g. fivetran) and config-load failures degrade gracefully to adapter_type: null / cost_usd: null while still emitting bytes + duration.
  • 12 unit tests (rollup math, adapter-missing degrade, BigQuery bytes path, model filter, by-id + latest resolution, adapter-default precedence) + the export_schemas round-trip drift check.
  • Regenerated schemas/cost.schema.json, dagster Pydantic (types_generated/cost_schema.py), and vscode TypeScript (types/generated/cost.ts) via just codegen.

Explicitly out of scope (follow-up PRs)

  • Cost surfacing on rocky replay output — ReplayOutput, RunOutput, and every other existing Output struct are left untouched so this PR is isolated to net-new.
  • Per-model [budget] blocks (first-wave per-pipeline budgets already shipped).
  • Adapter-reported bytes_scanned plumbing on the live rocky run path (Databricks manifest / Snowflake statistics / BigQuery totalBytesProcessed).
  • PR cost-projection GitHub Action.
  • Dagster RockyResource.cost(...) method + parse_rocky_output dispatch + test fixture. The Pydantic type is reachable today via from dagster_rocky.types_generated.cost_schema import CostOutput; a full resource-method wiring is a small follow-up once the live run path starts recording RunRecords.

Note on the write side

The "live rocky run → historical rocky cost" round-trip does not work end-to-end today, because no production code path currently calls StateStore::record_run. rocky replay, rocky trace, and rocky history share the same pre-existing gap. Once the write side lands (separate PR), this command will return real production data for every recorded run. The smoke test below uses a seeded state store to exercise the read path's happy case across DuckDB, Databricks, and BigQuery.

Smoke test (seeded state store)

A 3-model run was seeded into .rocky-state.redb: orders (12s, 2.46GB scanned), customers (8s, 618MB scanned), revenue_summary (18s, 4.12GB scanned).

DuckDB (from examples/playground/pocs/00-foundations/00-playground-default)

$ rocky cost latest --output table
run: demo-run-0001
status: success
trigger: manual
started_at: 2026-04-21T13:54:31.635391+00:00
finished_at: 2026-04-21T13:55:09.635391+00:00
duration_ms: 38000
adapter_type: duckdb

models (3):
  model                               duration          rows      bytes_scan     bytes_write      cost_usd  status
  orders                                 12000         10234      2457600000      1228800000     $0.000000  success
  customers                               8000          4812       618400000       309200000     $0.000000  success
  revenue_summary                        18000           182      4123456789           12288     $0.000000  success

total_duration_ms: 38000
total_bytes_scanned: 7199456789
total_bytes_written: 1538012288
total_cost_usd: $0.000000

Databricks (same run, --config pointing at a databricks rocky.toml with warehouse_size = "Medium", compute_cost_per_dbu = 0.40)

$ rocky cost latest --output table
run: demo-run-0001
status: success
trigger: manual
duration_ms: 38000
adapter_type: databricks

models (3):
  model                               duration      cost_usd  status
  orders                                 12000     $0.032000  success
  customers                               8000     $0.021333  success
  revenue_summary                        18000     $0.048000  success

total_duration_ms: 38000
total_cost_usd: $0.101333

Math check: 12s on Medium (24 DBU/hr) @ $0.40/DBU = 12/3600 × 24 × 0.40 = $0.032 ✓

BigQuery (same run, --config pointing at [adapter.default] type = "bigquery")

$ rocky cost latest --output table
run: demo-run-0001
adapter_type: bigquery

models (3):
  model                               bytes_scan      cost_usd  status
  orders                              2457600000     $0.015360  success
  customers                            618400000     $0.003865  success
  revenue_summary                     4123456789     $0.025772  success

total_cost_usd: $0.044997

Math check: 2.4576 GB / 1e12 × $6.25 = $0.015360 ✓

JSON output (DuckDB)

{
  "version": "1.11.0",
  "command": "cost",
  "run_id": "demo-run-0001",
  "status": "success",
  "trigger": "manual",
  "started_at": "2026-04-21T13:54:31.635391+00:00",
  "finished_at": "2026-04-21T13:55:09.635391+00:00",
  "duration_ms": 38000,
  "adapter_type": "duckdb",
  "total_cost_usd": 0.0,
  "total_duration_ms": 38000,
  "total_bytes_scanned": 7199456789,
  "total_bytes_written": 1538012288,
  "per_model": [
    { "model_name": "orders", "status": "success", "duration_ms": 12000, "rows_affected": 10234, "bytes_scanned": 2457600000, "bytes_written": 1228800000, "cost_usd": 0.0 },
    { "model_name": "customers", "status": "success", "duration_ms": 8000, "rows_affected": 4812, "bytes_scanned": 618400000, "bytes_written": 309200000, "cost_usd": 0.0 },
    { "model_name": "revenue_summary", "status": "success", "duration_ms": 18000, "rows_affected": 182, "bytes_scanned": 4123456789, "bytes_written": 12288, "cost_usd": 0.0 }
  ]
}

Graceful degrade (missing config)

$ rocky --config /tmp/missing.toml cost latest --output table
WARN failed to load config at /tmp/missing.toml — cost figures will be omitted: ...
adapter_type: (unavailable — config not loaded or not a billed warehouse)
...
total_cost_usd: -

--output json still produces valid JSON on stdout (warning goes to stderr via tracing).

Test plan

  • cargo test -p rocky-cli -p rocky-core — 194 + 976 tests pass
  • cargo clippy --workspace --all-targets -- -D warnings — clean
  • cargo fmt --all --check — clean
  • just codegen regenerates schema + Pydantic + TypeScript; git diff limited to the expected 8 files
  • cd integrations/dagster && uv run pytest — 307 tests pass (Pydantic regen didn't break any consumer)
  • cd editors/vscode && npm run compile — TypeScript compiles clean
  • Smoke test: DuckDB / Databricks / BigQuery configs all produce the expected cost numbers against a seeded RunRecord
  • Smoke test: missing config degrades gracefully with a warn! on stderr and cost_usd: null on stdout
  • --model <name> filter narrows the rollup to a single model; empty result errors with "run '…' did not execute model '…'"

Adds a new `rocky cost <target>` CLI verb that reads `RunRecord` from
the embedded state store and rolls per-model cost attribution up from
persisted `bytes_scanned`, `bytes_written`, and `duration_ms` values.
Re-uses `rocky_core::cost::compute_observed_cost_usd` — the same
formula `RunOutput::populate_cost_summary` applies at the end of a
live run — so the historical surface stays consistent with the
first-wave per-run summary (PR #171).

The command loads `rocky.toml` to resolve the billed-warehouse type;
when the config can't be read the output degrades gracefully to
`adapter_type: null` / `cost_usd: null`, and durations/bytes are still
emitted from the stored record. BigQuery is a quiet upgrade: because
`ModelExecution.bytes_scanned` is persisted, the historical command
returns a real cost figure for BQ runs even though the live run path
still reports `null` for BQ (adapter bytes-scanned plumbing is a
later wave).

- New `CostOutput` + `PerModelCostHistorical` in `output.rs` (deriving
  `JsonSchema`), registered in `export_schemas.rs::schemas()`.
- New `commands/cost.rs` with `run_cost` entry point and 12 unit tests
  (rollup math, empty-adapter degrade, BQ bytes path, model filter,
  by-id and latest resolution, adapter-default precedence).
- Clap subcommand `Cost { target, model }` in `rocky/src/main.rs`.
- Regenerated `schemas/cost.schema.json`, dagster Pydantic
  (`types_generated/cost_schema.py`), and vscode TypeScript
  (`types/generated/cost.ts`) via `just codegen`.

Scope notes:
- Does NOT extend `ReplayOutput`, `RunOutput`, or any existing Output
  struct — the new surface is isolated to `CostOutput`.
- Does NOT introduce per-model `[budget]` blocks or the PR
  cost-projection Action — those are later waves.
- The write side (`StateStore::record_run` being called from the live
  `rocky run` path) is a pre-existing gap shared with `rocky replay`,
  `rocky trace`, and `rocky history` — out of scope for this PR.
- Dagster resource-method wiring is deferred; the Pydantic type is
  reachable today via `types_generated.cost_schema`.
@hugocorreia90 hugocorreia90 merged commit b0b62b3 into main Apr 21, 2026
14 of 15 checks passed
@hugocorreia90 hugocorreia90 deleted the feat/arc2-rocky-cost branch April 21, 2026 14:02
hugocorreia90 added a commit that referenced this pull request Apr 21, 2026
* chore(engine): release 1.12.0

Arc 1 wave 2 + cleanup cascade. Eight PRs since v1.11.0:

- #199 SIGPIPE handler
- #200 rocky branch compare
- #201 POC target_dialect cleanup
- #202 rocky cost <run_id|latest> (Arc 2 wave 2 first PR)
- #203 rocky run persists RunRecord (Arc 1 wave 2 load-bearing fix)
- #204 docs + CHANGELOG [Unreleased] cascade
- #205 demo-branches-replay.gif refresh
- #206 real per-model started_at on MaterializationOutput

rocky history / replay / trace / cost now return real data end-to-end
for the first time. Full notes in CHANGELOG.

* feat(state): configurable transfer timeout + tracing span + Valkey wrap

- `StateConfig.transfer_timeout_seconds` (default 300s) replaces the hard-
  coded `STATE_TRANSFER_TIMEOUT`. Operators can now tune the wall-clock
  budget in `rocky.toml` for very large state or slow networks without
  recompiling. `StateConfig` gains a manual `Default` impl so
  `StateConfig::default()` yields 300s (not u64's zero).
- `state.upload` / `state.download` tracing spans wrap every transfer
  carrying `backend`, `bucket`, and `size_bytes`. The in-elapse warn event
  inherits those fields automatically, so hung transfers are diagnosable
  from stderr logs alone (which dagster-rocky streams into the Dagster run
  viewer).
- Structured `warn!` on timeout elapse ("state transfer exceeded timeout
  budget") with a `duration_ms` field — replaces silent `Timeout(_)`.
- Valkey read/write paths audited and closed: `redis::Client::get_connection`
  + `redis::cmd(...).query()` are sync and blocked the tokio runtime thread;
  no outer `tokio::time::timeout` could rescue them. Both `upload_to_valkey`
  and `download_from_valkey` now run under `tokio::task::spawn_blocking`
  inside `with_transfer_timeout`, closing the same class of hang the
  object-store paths were already protected against.
- `default_client_options()` in `object_store.rs` honours the standard
  `object_store`-crate env vars `AWS_ALLOW_HTTP` / `AZURE_ALLOW_HTTP` /
  `GOOGLE_STORAGE_ALLOW_HTTP`. Always off in production; the new
  integration test uses it to front-end the S3 SDK with a plain-HTTP
  wiremock server without bypassing the credential chain.
- New `tests/state_sync_timeout_test.rs` integration test: a wiremock S3
  endpoint that holds PutObject for 1h proves `upload_state` returns
  `StateSyncError::Timeout` within the configured 2s budget (+grace).
  A prompt-endpoint negative control guards against regressions.
- CHANGELOG entries added under [1.12.0]. Example config in
  `engine/examples/dagster-integration/rocky.toml` surfaces the new key.

cargo fmt clean; `clippy --workspace --all-targets -- -D warnings` clean;
all 977 rocky-core unit tests + 30 e2e + 20 integration + the 2 new
timeout tests pass.

* chore(codegen): regenerate schemas + pydantic types for StateConfig.transfer_timeout_seconds

* chore(fixtures): regenerate dagster test fixtures for 1.12.0

`just regen-fixtures` — version string bump only (1.11.0 → 1.12.0) across
35 captured fixtures under integrations/dagster/tests/fixtures_generated/.
hugocorreia90 added a commit that referenced this pull request Apr 21, 2026
* chore(dagster): release 1.8.0

Tracks engine 1.12.0 (Arc 1 wave 2 + cleanup cascade).

- #202 CostOutput + PerModelCostHistorical Pydantic bindings
- #203 OptimizeRecommendation gains 3 cost fields; HistoryResult soft-swap
- #203 Fixture normalizer: WALL_CLOCK_ID_FIELDS + derived-numerics
- #206 MaterializationOutput.started_at (real per-model wall-clock)

Full notes in integrations/dagster/CHANGELOG.md.

* fix(resource): eliminate two-readers race in run_streaming via watchdog

The previous _run_rocky_streaming implementation had the stderr
forwarder thread reading proc.stderr via TextIOWrapper iteration while
the main thread called proc.communicate(timeout=). Two concurrent
readers on the same pipe FD violates CPython's documented subprocess
contract — under stderr traffic the TimeoutExpired path intermittently
failed to fire, causing the 2026-04-18 and 2026-04-19 production hangs
(11.5h rocky run on the second incident, triggering a 58-run Dagster
queue backlog).

Rewrite to use dedicated single-reader threads per pipe + an external
watchdog that kills the process group via os.killpg(SIGKILL). The
watchdog enforces the wall-clock timeout independent of pipe-FD
semantics — traffic patterns on stderr no longer affect enforcement.

Changes in src/dagster_rocky/resource.py:
- Popen now passes start_new_session=True on POSIX so rocky gets its
  own process group (lets os.killpg reach any children rocky spawns).
- New _accumulate_stdout helper: sole reader of proc.stdout.
- Existing _forward_stderr_to_context is the sole reader of
  proc.stderr (docstring updated to call this out).
- Watchdog thread driven by threading.Event.wait(timeout_seconds); on
  timeout, calls os.killpg(SIGKILL) (POSIX) or proc.kill() (Windows)
  and sets the event. ProcessLookupError / OSError swallowed.
- Main thread calls proc.wait() with no timeout. finally block:
  fired.set() (dismiss watchdog) -> watchdog.join(1.0) ->
  stderr_reader.join(2.0) -> stdout_reader.join(2.0).
- Watchdog-fire detected via returncode == -signal.SIGKILL on POSIX;
  raises dg.Failure with description "Rocky command timed out after
  Ns (watchdog-killed)" and stderr_tail / duration_ms / pid metadata.
- Structured INFO log line on Popen success (pid, timeout_s, cmd)
  and on process exit (pid, returncode, duration_ms, outcome:
  success | failure | partial-success | timeout-killed). Closes the
  observability gap behind the prior incident post-mortems.
- All prior semantics preserved: allow_partial, partial-success JSON
  detection, version check, _build_cmd.

Audit of run_pipes (A.2): dg.PipesSubprocessClient.run() in Dagster
1.13.1 calls subprocess.Popen with stdout=stderr=None (inherit) by
default, no reader threads, and process.wait() without a timeout. No
two-readers race possible. Separate issue: pipes has no wall-clock
timeout enforcement at all (only SIGINT on Dagster cancel) — out of
scope for this release.

Tests (tests/test_resource.py):
- Update _streaming_popen_mock: proc.stdout now an iterator (was a
  MagicMock); proc.pid set to a realistic int. Required because the
  new stdout accumulator thread iterates proc.stdout.
- Rewrite test_run_streaming_timeout_kills_proc_and_raises: target
  proc.wait() + os.killpg mocks instead of proc.communicate raising
  TimeoutExpired. Exercises the real threading.Event.wait watchdog
  path end-to-end. Asserts os.killpg called once, Failure description
  contains "1s" and "watchdog-killed".
- Add test_run_streaming_hard_kills_hung_binary_with_stderr_chatter:
  real POSIX shell script that hangs forever spamming stderr at 20Hz
  (the exact production pattern). Asserts dg.Failure raised within
  5s of a 2s timeout. Skipped on Windows.
- Add test_run_streaming_timeout_fires_natively_without_daemon_reader:
  negative control. Same hang script, stderr forwarder monkeypatched
  to a no-op. Documents that the watchdog's effectiveness is
  pipe-FD-independent — external SIGKILL bypasses the race entirely.
- All 309 existing tests still pass; ruff check + format clean.

uv.lock picks up the 1.7.0 -> 1.8.0 version bump already committed in
1abb6a4 (release commit for 1.8.0).

* docs(dagster): CHANGELOG 1.8.0 - subprocess watchdog + race fix

Document the two-readers race fix in dagster-rocky 1.8.0:

- ### Fixed entry describes the proc.stderr two-readers race between
  the daemon forwarder and proc.communicate(timeout=), production
  impact (2026-04-18 and 2026-04-19 hangs), and the A.2 audit outcome
  for run_pipes (upstream PipesSubprocessClient is safe — inherits
  stdio by default, no reader threads, process.wait() without timeout).
- ### Added entries describe the three mechanism changes:
  - Watchdog-based timeout enforcement via threading.Event + os.killpg
    (with start_new_session=True for POSIX process-group isolation).
  - Single-reader threads for stdout and stderr restoring conformance
    with CPython's documented subprocess contract.
  - Structured start/end logging with pid, returncode, duration_ms,
    outcome — closes the observability gap behind prior post-mortems.

Entries go under [1.8.0] (not [Unreleased]) because PR #208 is the
release PR; these changes ship with the 1.8.0 publish when it merges.

* test(resource): address code-quality review on run_streaming regression tests

- drop the unused `_HANG_WITH_STDERR_CHATTER_SCRIPT` module-level constant
  (the tests construct the script inline via `_write_hang_fake`)
- replace bare `pass` in `_noop_forwarder`'s except with an explanatory
  comment + `return`, documenting that drain-only helpers swallow the
  same pipe-close / decode errors the real forwarder does
hugocorreia90 added a commit that referenced this pull request Apr 21, 2026
Tracks engine 1.12.0 (Arc 1 wave 2 + cleanup cascade). Regenerated
TypeScript bindings + project schema; no feature changes to the
extension itself.

- #202 CostOutput + PerModelCostHistorical TypeScript
- #203 OptimizeRecommendation cost fields
- #206 MaterializationOutput.started_at

Full notes in editors/vscode/CHANGELOG.md.
hugocorreia90 added a commit that referenced this pull request Apr 22, 2026
…#218)

Wires the `rocky cost <run_id|latest>` CLI (shipped in engine PR #202) into
the Dagster integration. Adds a `RockyResource.cost(run_id="latest")` method
that shells out and returns the generated `CostOutput` Pydantic model, plus a
unit test for the happy path (default and explicit run_id) and a live-binary
fixture captured from the playground POC.

- Re-exports `CostOutput` + `PerModelCostHistorical` from `types_generated`
  and `types.py`, registers `CostOutput` in the `RockyOutput` union and the
  `parse_rocky_output` dispatch under the `"cost"` command literal.
- `scripts/regen_fixtures.sh` now captures `fixtures_generated/cost.json`
  (the bootstrap `rocky run` at the top of the script already persists the
  RunRecord that `rocky cost latest` rolls up).
- `test_generated_fixtures.py::EXPECTED_TYPES` maps `"cost"` → `CostOutput`
  so the parse-guard covers the new fixture.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant