feat(integrations/dagster): wave 1 — Pipes execution mode + strict doctor on startup#224
Merged
hugocorreia90 merged 2 commits intomainfrom Apr 22, 2026
Merged
Conversation
…ctor on startup
Two Python-only dagster-rocky features that users can opt into
independently — default behaviour is unchanged on upgrade.
RockyComponent: execution_mode = "streaming" (default) | "pipes"
The component-backed multi-asset now supports the Dagster Pipes
protocol as a first-class execution mode. When the attribute is
set to "pipes", each rocky run invocation goes through
RockyResource.run_pipes and the engine emits
MaterializationEvent / AssetCheckEvaluation entries directly over
the Pipes wire. The buffered JSON post-processing path is skipped
in this mode — Pipes is the single source of truth for run events.
Asset-key translation and subset filtering happen at the reader
layer via a new RockyPipesMessageReader. That's a subclass of
PipesTempFileMessageReader that wraps the handler with a small
proxy; the proxy rewrites the wire asset_key (a slash-joined path
of [source_type, *components, table]) to the Dagster key the
component declared via its translator, and drops events whose
resolved key is outside the subset selection. Catching events at
the reader layer (not post-filtering .get_results()) is the
architecturally correct shape — nothing leaks through, nothing
races.
RockyResource: strict_doctor + strict_doctor_checks
Opt-in strict rocky doctor gate at resource startup via
setup_for_execution. Default strict_doctor=False means doctor is
never invoked at startup — zero cold-start cost for users who
don't opt in. When enabled:
* Empty strict_doctor_checks → fail on any critical check.
* Non-empty → fail only on listed critical checks; others are
logged as warnings so operators still see them.
* Warning severity never fails, regardless of the list.
The two-field shape (bool + list[str]) was chosen over a single
tri-state Union field because Dagster's Pydantic config surface
doesn't accept Union[list, Literal] types.
Design decisions
* Custom PipesMessageReader subclass (not post-filter) — trust-grade
correctness over iteration speed. The interception point is a
private API (PipesFileMessageReader._reader_thread); if that
contract changes, the dagster bump will surface it before release.
* Per-check strictness via an allowlist (not a policy DSL) — the
simplest surface that exposes the tolerant-vs-strict choice
without forcing a default flip on upgrade.
* run_pipes(asset_key_fn=..., include_keys=...) — the reader is
only constructed when either kwarg is non-None, so the default
Pipes path is identical to canonical Dagster.
* Pipes-mode contract-check results are still sourced from compile
diagnostics (build-time signal the Pipes wire doesn't carry).
Wave 2 follow-up: RockyResource.state_health() (FR-003).
Tests
* tests/test_pipes_mode.py — 17 tests covering handler-proxy key
translation, include_keys filtering, non-asset message
passthrough, malformed-asset_key defensiveness, framework
exception __getattr__ delegation, run_pipes kwarg plumbing, and
the component's asset_key_fn builder (including drift-event
last-segment fallback).
* tests/test_strict_doctor.py — 13 tests covering disabled default,
fail-on-any-critical, allowlist matches, warnings never failing,
binary-failure wrapping under strict mode, and component
pass-through.
uv run pytest: 342 passed (up from 312).
uv run ruff check + format: clean.
Closes FR-001 and FR-006.
Addresses two github-code-quality bot comments on #224: the test helpers passed `asset_key_fn=lambda path: dg.AssetKey(path)`, which is just a wrapper around `dg.AssetKey` itself — pass it directly. No behaviour change; asset_key_fn still receives a list[str] and returns an AssetKey.
This was referenced Apr 22, 2026
hugocorreia90
added a commit
that referenced
this pull request
Apr 22, 2026
* chore: release engine-v1.14.0 + dagster-v1.10.0 + vscode-v1.6.4 Bumps all three artifacts to cover the 16-PR cascade since engine-v1.13.0 / dagster-v1.9.0 / vscode-v1.6.3. Details in each CHANGELOG. Engine headlines (12 PRs): - Arc 7 wave 2 wave-2 complete — cached DESCRIBE end-to-end (#223 infra, #228 reads, #230 write tap, #231 discover warm-up, #232 state controls + --cache-ttl override) - Arc 2 wave 3 complete — bytes_scanned / bytes_written on MaterializationOutput (#219 BQ, #221 Databricks, #220 Snowflake deferred doc, #222 docstring cascade). Real $ on rocky cost for BQ + Databricks - FR-005 Unity Catalog workspace-binding reconcile (#226) - FR-002 Fivetran connector metadata via SourceOutput.metadata (#225) - Housekeeping: compute_backoff dedup into rocky_core::retry (#217) Dagster headlines (4 PRs): - FR-001 RockyComponent Pipes execution mode + FR-006 strict doctor on RockyResource startup (#224) - FR-003 RockyResource.state_health() (#227) + FR follow-up threading doctor(check=state_rw) for sub-second probes (#229) - RockyResource.cost() wiring + fixture (#218) VS Code: regenerated TS bindings for engine 1.14.0 type additions. No extension feature changes. * chore(integrations/dagster): regenerate test fixtures for engine 1.14.0 36 fixtures picked up the new engine version string in their top-level "version" field. No schema changes — just the version bump.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two Python-only dagster-rocky opt-in features, bundled into one
coordinated release. Default behaviour is unchanged on upgrade for
both — every existing user gets the current buffered/streaming
execution path and the current tolerant doctor behaviour with no
config change required.
FR-001 —
RockyComponent.execution_mode: Literal["streaming", "pipes"]The component-backed multi-asset now supports the Dagster Pipes
protocol as a first-class execution mode. When
execution_modeisset to
"pipes", eachrocky runinvocation goes throughRockyResource.run_pipesand the engine emitsMaterializationEvent/AssetCheckEvaluationentries directlyover the Pipes wire. The buffered
_emit_resultsJSONpost-processing path is skipped in this mode — Pipes becomes the
single source of truth for run events (no double-emit).
Asset-key translation and subset filtering happen at the reader
layer via a new
RockyPipesMessageReader: a subclass ofPipesTempFileMessageReaderthat wraps the handler with a smallproxy. The proxy rewrites the wire
asset_key(a slash-joined pathof
[source_type, *components, table]) to the Dagster key thecomponent declared via its translator, and drops events whose
resolved key is outside the subset selection.
Opt-in per-component:
FR-006 —
RockyResource.strict_doctor+strict_doctor_checksOpt-in strict
rocky doctorgate at resource startup viasetup_for_execution. Tri-state surface via two fields:strict_doctorstrict_doctor_checksFalse(default)True[]True["state_rw", "auth"]Warnings (non-critical severities) never raise regardless of the
list. Critical-but-not-listed checks are surfaced via
context.log.warningso operators still see them. Both fieldspass through from
RockyComponentto the constructed resource.Opt-in per-component:
Design decisions
PipesMessageReadersubclass, not post-filter.Translation + filtering happen at the reader layer so events for
unselected tables never reach Dagster's handler in the first
place. The interception point is private
(
PipesFileMessageReader._reader_threadcallshandler.handle_message); if that contract shifts, the dagsterbump in CI will surface the breakage before release.
bool+list[str]). The FRdoc's optimistic
Literal["none", "all"] | list[str]shapedoesn't round-trip through Dagster's Pydantic config serializer
(
DagsterInvalidPythonicConfigDefinitionErroronUnion[list, Literal]). Two fields cover the same three semantic casescleanly and type-check.
pipes mode. Contract violations are a build-time signal the
Pipes wire doesn't carry, so the component keeps emitting them
post-run in both execution modes.
run_pipesonly wraps the client when translation/filterkwargs are passed. Users calling
run_pipesdirectly (not viaRockyComponent) get the unchanged canonical Dagster Pipes path.Test plan
tests/test_pipes_mode.py— 17 new tests covering:check events.
include_keysfiltering at the handler layer.log,opened,closed) pass throughuntouched.
asset_keypayloads forwarded to the inner handlerfor validation.
report_pipes_framework_exceptiondelegation via__getattr__(private-API safety net).
run_pipeskwarg plumbing: custom reader only whenasset_key_fnorinclude_keysis supplied; caller-suppliedclient wins.
RockyComponent.execution_modedefault is"streaming"._run_filters_pipesbuilds anasset_key_fnfrom the group'srocky_key_to_dagster_keymap, with last-segment fallback fordrift events.
tests/test_strict_doctor.py— 13 new tests covering:is inert.
strict_doctor=True+ empty list fails on any critical.non-listed critical checks surface as warnings.
fail-on-any).
dg.Failure.RockyComponentforwards both fields to the resource factory.uv run pytest -q→ 342 passed (up from 312; 30 new, 0regressions).
uv run ruff check src/ tests/→ clean.uv run ruff format --check src/ tests/→ clean.needed.
Wave 2 follow-up
RockyResource.state_health()(FR-003) — programmatic state-backendhealth probe — ships separately once this lands.