feat(engine): stamp real per-model started_at on MaterializationOutput#206
Merged
hugocorreia90 merged 1 commit intomainfrom Apr 21, 2026
Merged
Conversation
Replaces the lossy reconstruction PR #203 left behind: per-model started_at was derived as \`finished_at - duration_ms\` (with every model sharing the run's finished_at), which collapses parallel executions to a single endpoint and mis-orders overlapping models. Now MaterializationOutput carries \`started_at: DateTime<Utc>\` as a required field, captured with \`Utc::now()\` at the moment each model begins execution: - process_table — \`table_started_at\` alongside the existing \`table_start\` Instant, used in the parallel-dispatch path. - execute_time_interval_partition — \`partition_started_at\` alongside \`partition_start\`, used per partition. - run_snapshot — \`started_at\` alongside \`start\`, used by the transformation pipeline path. RunOutput::to_run_record now uses each materialization's \`started_at\` directly and derives the per-model \`finished_at\` as \`started_at + duration_ms\`. Parallel runs preserve their real ordering on the persisted RunRecord; \`rocky replay\` / \`rocky trace\` / \`rocky history\` / \`rocky cost\` all read correct wall-clock windows instead of collapsed finish-relative ones. The existing \`WALL_CLOCK_FIELDS\` set in scripts/_normalize_fixture.py already includes \`started_at\`, so the new field gets sentinelled to \`2000-01-01T00:00:00Z\` in regenerated fixtures — byte-stable across invocations. Confirmed across two consecutive \`just regen-fixtures\` runs on the same machine. ## Test plan - [x] \`cargo test -p rocky-cli -p rocky-core\` — 205 + 976 tests pass - [x] \`cargo clippy --workspace --all-targets -- -D warnings\` clean - [x] \`cargo fmt --all --check\` clean - [x] \`just codegen\` regenerated schemas + Pydantic + TypeScript - [x] \`just regen-fixtures\` byte-stable across two regens - [x] \`uv run pytest\` in integrations/dagster — 307 tests pass - [x] \`npm run compile\` in editors/vscode — clean - [x] End-to-end: \`rocky run\` → \`rocky replay latest\` shows a real wall-clock \`started_at\` distinct from the run's \`finished_at\` ## What's deferred - Real per-model \`finished_at\` — currently derived from \`started_at + duration_ms\`. For strict fidelity the runtime could stamp Utc::now() at the actual completion site, but that's two sources of truth for an identity that already holds. Keeping the derivation means a single field drives the invariant. - \`bytes_scanned\` / \`bytes_written\` plumbing — still \`None\` on MaterializationOutput (adapter-layer work). Same follow-up called out on \`populate_cost_summary\`.
This was referenced Apr 21, 2026
hugocorreia90
added a commit
that referenced
this pull request
Apr 21, 2026
* chore(engine): release 1.12.0 Arc 1 wave 2 + cleanup cascade. Eight PRs since v1.11.0: - #199 SIGPIPE handler - #200 rocky branch compare - #201 POC target_dialect cleanup - #202 rocky cost <run_id|latest> (Arc 2 wave 2 first PR) - #203 rocky run persists RunRecord (Arc 1 wave 2 load-bearing fix) - #204 docs + CHANGELOG [Unreleased] cascade - #205 demo-branches-replay.gif refresh - #206 real per-model started_at on MaterializationOutput rocky history / replay / trace / cost now return real data end-to-end for the first time. Full notes in CHANGELOG. * feat(state): configurable transfer timeout + tracing span + Valkey wrap - `StateConfig.transfer_timeout_seconds` (default 300s) replaces the hard- coded `STATE_TRANSFER_TIMEOUT`. Operators can now tune the wall-clock budget in `rocky.toml` for very large state or slow networks without recompiling. `StateConfig` gains a manual `Default` impl so `StateConfig::default()` yields 300s (not u64's zero). - `state.upload` / `state.download` tracing spans wrap every transfer carrying `backend`, `bucket`, and `size_bytes`. The in-elapse warn event inherits those fields automatically, so hung transfers are diagnosable from stderr logs alone (which dagster-rocky streams into the Dagster run viewer). - Structured `warn!` on timeout elapse ("state transfer exceeded timeout budget") with a `duration_ms` field — replaces silent `Timeout(_)`. - Valkey read/write paths audited and closed: `redis::Client::get_connection` + `redis::cmd(...).query()` are sync and blocked the tokio runtime thread; no outer `tokio::time::timeout` could rescue them. Both `upload_to_valkey` and `download_from_valkey` now run under `tokio::task::spawn_blocking` inside `with_transfer_timeout`, closing the same class of hang the object-store paths were already protected against. - `default_client_options()` in `object_store.rs` honours the standard `object_store`-crate env vars `AWS_ALLOW_HTTP` / `AZURE_ALLOW_HTTP` / `GOOGLE_STORAGE_ALLOW_HTTP`. Always off in production; the new integration test uses it to front-end the S3 SDK with a plain-HTTP wiremock server without bypassing the credential chain. - New `tests/state_sync_timeout_test.rs` integration test: a wiremock S3 endpoint that holds PutObject for 1h proves `upload_state` returns `StateSyncError::Timeout` within the configured 2s budget (+grace). A prompt-endpoint negative control guards against regressions. - CHANGELOG entries added under [1.12.0]. Example config in `engine/examples/dagster-integration/rocky.toml` surfaces the new key. cargo fmt clean; `clippy --workspace --all-targets -- -D warnings` clean; all 977 rocky-core unit tests + 30 e2e + 20 integration + the 2 new timeout tests pass. * chore(codegen): regenerate schemas + pydantic types for StateConfig.transfer_timeout_seconds * chore(fixtures): regenerate dagster test fixtures for 1.12.0 `just regen-fixtures` — version string bump only (1.11.0 → 1.12.0) across 35 captured fixtures under integrations/dagster/tests/fixtures_generated/.
hugocorreia90
added a commit
that referenced
this pull request
Apr 21, 2026
* chore(dagster): release 1.8.0 Tracks engine 1.12.0 (Arc 1 wave 2 + cleanup cascade). - #202 CostOutput + PerModelCostHistorical Pydantic bindings - #203 OptimizeRecommendation gains 3 cost fields; HistoryResult soft-swap - #203 Fixture normalizer: WALL_CLOCK_ID_FIELDS + derived-numerics - #206 MaterializationOutput.started_at (real per-model wall-clock) Full notes in integrations/dagster/CHANGELOG.md. * fix(resource): eliminate two-readers race in run_streaming via watchdog The previous _run_rocky_streaming implementation had the stderr forwarder thread reading proc.stderr via TextIOWrapper iteration while the main thread called proc.communicate(timeout=). Two concurrent readers on the same pipe FD violates CPython's documented subprocess contract — under stderr traffic the TimeoutExpired path intermittently failed to fire, causing the 2026-04-18 and 2026-04-19 production hangs (11.5h rocky run on the second incident, triggering a 58-run Dagster queue backlog). Rewrite to use dedicated single-reader threads per pipe + an external watchdog that kills the process group via os.killpg(SIGKILL). The watchdog enforces the wall-clock timeout independent of pipe-FD semantics — traffic patterns on stderr no longer affect enforcement. Changes in src/dagster_rocky/resource.py: - Popen now passes start_new_session=True on POSIX so rocky gets its own process group (lets os.killpg reach any children rocky spawns). - New _accumulate_stdout helper: sole reader of proc.stdout. - Existing _forward_stderr_to_context is the sole reader of proc.stderr (docstring updated to call this out). - Watchdog thread driven by threading.Event.wait(timeout_seconds); on timeout, calls os.killpg(SIGKILL) (POSIX) or proc.kill() (Windows) and sets the event. ProcessLookupError / OSError swallowed. - Main thread calls proc.wait() with no timeout. finally block: fired.set() (dismiss watchdog) -> watchdog.join(1.0) -> stderr_reader.join(2.0) -> stdout_reader.join(2.0). - Watchdog-fire detected via returncode == -signal.SIGKILL on POSIX; raises dg.Failure with description "Rocky command timed out after Ns (watchdog-killed)" and stderr_tail / duration_ms / pid metadata. - Structured INFO log line on Popen success (pid, timeout_s, cmd) and on process exit (pid, returncode, duration_ms, outcome: success | failure | partial-success | timeout-killed). Closes the observability gap behind the prior incident post-mortems. - All prior semantics preserved: allow_partial, partial-success JSON detection, version check, _build_cmd. Audit of run_pipes (A.2): dg.PipesSubprocessClient.run() in Dagster 1.13.1 calls subprocess.Popen with stdout=stderr=None (inherit) by default, no reader threads, and process.wait() without a timeout. No two-readers race possible. Separate issue: pipes has no wall-clock timeout enforcement at all (only SIGINT on Dagster cancel) — out of scope for this release. Tests (tests/test_resource.py): - Update _streaming_popen_mock: proc.stdout now an iterator (was a MagicMock); proc.pid set to a realistic int. Required because the new stdout accumulator thread iterates proc.stdout. - Rewrite test_run_streaming_timeout_kills_proc_and_raises: target proc.wait() + os.killpg mocks instead of proc.communicate raising TimeoutExpired. Exercises the real threading.Event.wait watchdog path end-to-end. Asserts os.killpg called once, Failure description contains "1s" and "watchdog-killed". - Add test_run_streaming_hard_kills_hung_binary_with_stderr_chatter: real POSIX shell script that hangs forever spamming stderr at 20Hz (the exact production pattern). Asserts dg.Failure raised within 5s of a 2s timeout. Skipped on Windows. - Add test_run_streaming_timeout_fires_natively_without_daemon_reader: negative control. Same hang script, stderr forwarder monkeypatched to a no-op. Documents that the watchdog's effectiveness is pipe-FD-independent — external SIGKILL bypasses the race entirely. - All 309 existing tests still pass; ruff check + format clean. uv.lock picks up the 1.7.0 -> 1.8.0 version bump already committed in 1abb6a4 (release commit for 1.8.0). * docs(dagster): CHANGELOG 1.8.0 - subprocess watchdog + race fix Document the two-readers race fix in dagster-rocky 1.8.0: - ### Fixed entry describes the proc.stderr two-readers race between the daemon forwarder and proc.communicate(timeout=), production impact (2026-04-18 and 2026-04-19 hangs), and the A.2 audit outcome for run_pipes (upstream PipesSubprocessClient is safe — inherits stdio by default, no reader threads, process.wait() without timeout). - ### Added entries describe the three mechanism changes: - Watchdog-based timeout enforcement via threading.Event + os.killpg (with start_new_session=True for POSIX process-group isolation). - Single-reader threads for stdout and stderr restoring conformance with CPython's documented subprocess contract. - Structured start/end logging with pid, returncode, duration_ms, outcome — closes the observability gap behind prior post-mortems. Entries go under [1.8.0] (not [Unreleased]) because PR #208 is the release PR; these changes ship with the 1.8.0 publish when it merges. * test(resource): address code-quality review on run_streaming regression tests - drop the unused `_HANG_WITH_STDERR_CHATTER_SCRIPT` module-level constant (the tests construct the script inline via `_write_hang_fake`) - replace bare `pass` in `_noop_forwarder`'s except with an explanatory comment + `return`, documenting that drain-only helpers swallow the same pipe-close / decode errors the real forwarder does
hugocorreia90
added a commit
that referenced
this pull request
Apr 21, 2026
Tracks engine 1.12.0 (Arc 1 wave 2 + cleanup cascade). Regenerated TypeScript bindings + project schema; no feature changes to the extension itself. - #202 CostOutput + PerModelCostHistorical TypeScript - #203 OptimizeRecommendation cost fields - #206 MaterializationOutput.started_at Full notes in editors/vscode/CHANGELOG.md.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes the remaining unblocked Arc 1 wave 2 item from
TODO.md§1. PR #203 captured run-levelstarted_at/finished_atbut lossily reconstructed per-model windows asfinished_at = run.finished_at,started_at = finished_at - duration_ms— every model shared the same finished_at and parallel execution ordering was collapsed.This PR stamps a real wall-clock
DateTime<Utc>on each model at the moment execution begins, propagates it throughMaterializationOutput, and uses it to build honest per-model windows on the persistedRunRecord.Changes
MaterializationOutput::started_at: DateTime<Utc>(required). Captured withUtc::now()at the three sites that construct a materialization:process_table— parallel-dispatch path (table_started_atalongsidetable_startInstant).execute_time_interval_partition— per-partition (partition_started_atalongsidepartition_start).run_snapshot— transformation pipeline (started_atalongsidestart).RunOutput::to_run_recordnow uses each materialization'sstarted_atdirectly and derivesfinished_atasstarted_at + duration_ms. Parallel runs preserve their real ordering; the old finish-relative collapse is gone.mat()helpers inoutput.rsnow take an explicitstarted_at. The rewrittento_run_record_uses_per_model_started_attest exercises the parallel-overlap case: two models with staggered starts and overlapping windows end up with correct, distinct windows on the RunRecord.Downstream cascade
schemas/run.schema.json,types_generated/run_schema.py,src/types/generated/run.tsall regenerated —started_atadded as required in the nestedMaterializationOutput.just regen-fixturespicked up the new field on 11 fixtures; normalizer's existingWALL_CLOCK_FIELDShandles it (sentinelled to2000-01-01T00:00:00Z). Byte-stable across two consecutive regens.End-to-end check
Pre-PR the
started_atwould have mirrored the run's ownfinished_at.Test plan
cargo test -p rocky-cli -p rocky-core— 205 + 976 tests passcargo clippy --workspace --all-targets -- -D warningscleancargo fmt --all --checkcleanjust codegenregenerated;codegen-driftCI will confirmjust regen-fixturesbyte-stable across two regensuv run pytestinintegrations/dagster/— 307 tests passnpm run compileineditors/vscode/— cleanrocky run→rocky replay latestshows a real wall-clockstarted_atWhat's still deferred
finished_at— derived fromstarted_at + duration_ms. Keeping the derivation means a single source of truth; adding a second stamp at completion would double-count the invariant. Can flip later if strict fidelity becomes important.bytes_scanned/bytes_written— stillNoneonMaterializationOutput(adapter-layer plumbing). Same follow-up called out onpopulate_cost_summary.Completes TODO.md §1 "Real per-model start timestamps" — the last unblocked Arc 1 wave 2 item. Warehouse-native clone and replay re-execution remain blocked on external prerequisites (sandbox access, Exp 4).