Skip to content

feat(integrations/dagster): wave 1 — Pipes execution mode + strict doctor on startup#224

Merged
hugocorreia90 merged 2 commits intomainfrom
feat/dagster-wave1-pipes-strict-doctor
Apr 22, 2026
Merged

feat(integrations/dagster): wave 1 — Pipes execution mode + strict doctor on startup#224
hugocorreia90 merged 2 commits intomainfrom
feat/dagster-wave1-pipes-strict-doctor

Conversation

@hugocorreia90
Copy link
Copy Markdown
Contributor

Summary

Two Python-only dagster-rocky opt-in features, bundled into one
coordinated release. Default behaviour is unchanged on upgrade for
both — every existing user gets the current buffered/streaming
execution path and the current tolerant doctor behaviour with no
config change required.

FR-001 — RockyComponent.execution_mode: Literal["streaming", "pipes"]

The component-backed multi-asset now supports the Dagster Pipes
protocol as a first-class execution mode. When execution_mode is
set to "pipes", each rocky run invocation goes through
RockyResource.run_pipes and the engine emits
MaterializationEvent / AssetCheckEvaluation entries directly
over the Pipes wire. The buffered _emit_results JSON
post-processing path is skipped in this mode — Pipes becomes the
single source of truth for run events (no double-emit).

Asset-key translation and subset filtering happen at the reader
layer via a new RockyPipesMessageReader: a subclass of
PipesTempFileMessageReader that wraps the handler with a small
proxy. The proxy rewrites the wire asset_key (a slash-joined path
of [source_type, *components, table]) to the Dagster key the
component declared via its translator, and drops events whose
resolved key is outside the subset selection.

Opt-in per-component:

type: dagster_rocky.RockyComponent
attributes:
  execution_mode: pipes

FR-006 — RockyResource.strict_doctor + strict_doctor_checks

Opt-in strict rocky doctor gate at resource startup via
setup_for_execution. Tri-state surface via two fields:

strict_doctor strict_doctor_checks Behaviour
False (default) Doctor not invoked at startup. Zero cold-start cost.
True [] Fail on any critical check.
True ["state_rw", "auth"] Fail only on listed critical checks; others log as warnings.

Warnings (non-critical severities) never raise regardless of the
list. Critical-but-not-listed checks are surfaced via
context.log.warning so operators still see them. Both fields
pass through from RockyComponent to the constructed resource.

Opt-in per-component:

type: dagster_rocky.RockyComponent
attributes:
  strict_doctor: true
  strict_doctor_checks: [state_rw]

Design decisions

  • Custom PipesMessageReader subclass, not post-filter.
    Translation + filtering happen at the reader layer so events for
    unselected tables never reach Dagster's handler in the first
    place. The interception point is private
    (PipesFileMessageReader._reader_thread calls
    handler.handle_message); if that contract shifts, the dagster
    bump in CI will surface the breakage before release.
  • Two-field strictness surface (bool + list[str]). The FR
    doc's optimistic Literal["none", "all"] | list[str] shape
    doesn't round-trip through Dagster's Pydantic config serializer
    (DagsterInvalidPythonicConfigDefinitionError on Union[list, Literal]). Two fields cover the same three semantic cases
    cleanly and type-check.
  • Contract checks still sourced from compile diagnostics in
    pipes mode.
    Contract violations are a build-time signal the
    Pipes wire doesn't carry, so the component keeps emitting them
    post-run in both execution modes.
  • run_pipes only wraps the client when translation/filter
    kwargs are passed.
    Users calling run_pipes directly (not via
    RockyComponent) get the unchanged canonical Dagster Pipes path.

Test plan

  • tests/test_pipes_mode.py — 17 new tests covering:
    • Handler-proxy asset-key translation for materializations and
      check events.
    • include_keys filtering at the handler layer.
    • Non-asset messages (log, opened, closed) pass through
      untouched.
    • Malformed asset_key payloads forwarded to the inner handler
      for validation.
    • report_pipes_framework_exception delegation via __getattr__
      (private-API safety net).
    • run_pipes kwarg plumbing: custom reader only when
      asset_key_fn or include_keys is supplied; caller-supplied
      client wins.
    • RockyComponent.execution_mode default is "streaming".
    • _run_filters_pipes builds an asset_key_fn from the group's
      rocky_key_to_dagster_key map, with last-segment fallback for
      drift events.
  • tests/test_strict_doctor.py — 13 new tests covering:
    • Disabled mode (default) skips doctor entirely; a broken binary
      is inert.
    • strict_doctor=True + empty list fails on any critical.
    • Allowlist matches: only listed critical checks raise;
      non-listed critical checks surface as warnings.
    • Warning severity never fails (even when listed; even under
      fail-on-any).
    • Binary failures under strict mode wrap into a startup
      dg.Failure.
    • RockyComponent forwards both fields to the resource factory.
  • uv run pytest -q342 passed (up from 312; 30 new, 0
    regressions).
  • uv run ruff check src/ tests/ → clean.
  • uv run ruff format --check src/ tests/ → clean.
  • No engine-side changes; no codegen regen; no fixture refresh
    needed.

Wave 2 follow-up

RockyResource.state_health() (FR-003) — programmatic state-backend
health probe — ships separately once this lands.

…ctor on startup

Two Python-only dagster-rocky features that users can opt into
independently — default behaviour is unchanged on upgrade.

RockyComponent: execution_mode = "streaming" (default) | "pipes"

  The component-backed multi-asset now supports the Dagster Pipes
  protocol as a first-class execution mode. When the attribute is
  set to "pipes", each rocky run invocation goes through
  RockyResource.run_pipes and the engine emits
  MaterializationEvent / AssetCheckEvaluation entries directly over
  the Pipes wire. The buffered JSON post-processing path is skipped
  in this mode — Pipes is the single source of truth for run events.

  Asset-key translation and subset filtering happen at the reader
  layer via a new RockyPipesMessageReader. That's a subclass of
  PipesTempFileMessageReader that wraps the handler with a small
  proxy; the proxy rewrites the wire asset_key (a slash-joined path
  of [source_type, *components, table]) to the Dagster key the
  component declared via its translator, and drops events whose
  resolved key is outside the subset selection. Catching events at
  the reader layer (not post-filtering .get_results()) is the
  architecturally correct shape — nothing leaks through, nothing
  races.

RockyResource: strict_doctor + strict_doctor_checks

  Opt-in strict rocky doctor gate at resource startup via
  setup_for_execution. Default strict_doctor=False means doctor is
  never invoked at startup — zero cold-start cost for users who
  don't opt in. When enabled:

  * Empty strict_doctor_checks → fail on any critical check.
  * Non-empty → fail only on listed critical checks; others are
    logged as warnings so operators still see them.
  * Warning severity never fails, regardless of the list.

  The two-field shape (bool + list[str]) was chosen over a single
  tri-state Union field because Dagster's Pydantic config surface
  doesn't accept Union[list, Literal] types.

Design decisions

* Custom PipesMessageReader subclass (not post-filter) — trust-grade
  correctness over iteration speed. The interception point is a
  private API (PipesFileMessageReader._reader_thread); if that
  contract changes, the dagster bump will surface it before release.
* Per-check strictness via an allowlist (not a policy DSL) — the
  simplest surface that exposes the tolerant-vs-strict choice
  without forcing a default flip on upgrade.
* run_pipes(asset_key_fn=..., include_keys=...) — the reader is
  only constructed when either kwarg is non-None, so the default
  Pipes path is identical to canonical Dagster.
* Pipes-mode contract-check results are still sourced from compile
  diagnostics (build-time signal the Pipes wire doesn't carry).

Wave 2 follow-up: RockyResource.state_health() (FR-003).

Tests

* tests/test_pipes_mode.py — 17 tests covering handler-proxy key
  translation, include_keys filtering, non-asset message
  passthrough, malformed-asset_key defensiveness, framework
  exception __getattr__ delegation, run_pipes kwarg plumbing, and
  the component's asset_key_fn builder (including drift-event
  last-segment fallback).
* tests/test_strict_doctor.py — 13 tests covering disabled default,
  fail-on-any-critical, allowlist matches, warnings never failing,
  binary-failure wrapping under strict mode, and component
  pass-through.

uv run pytest: 342 passed (up from 312).
uv run ruff check + format: clean.

Closes FR-001 and FR-006.
Comment thread integrations/dagster/tests/test_pipes_mode.py Fixed
Comment thread integrations/dagster/tests/test_pipes_mode.py Fixed
Addresses two github-code-quality bot comments on #224: the test
helpers passed `asset_key_fn=lambda path: dg.AssetKey(path)`, which is
just a wrapper around `dg.AssetKey` itself — pass it directly.

No behaviour change; asset_key_fn still receives a list[str] and returns
an AssetKey.
@hugocorreia90 hugocorreia90 merged commit d712d4b into main Apr 22, 2026
8 checks passed
@hugocorreia90 hugocorreia90 deleted the feat/dagster-wave1-pipes-strict-doctor branch April 22, 2026 16:03
hugocorreia90 added a commit that referenced this pull request Apr 22, 2026
* chore: release engine-v1.14.0 + dagster-v1.10.0 + vscode-v1.6.4

Bumps all three artifacts to cover the 16-PR cascade since engine-v1.13.0
/ dagster-v1.9.0 / vscode-v1.6.3. Details in each CHANGELOG.

Engine headlines (12 PRs):
- Arc 7 wave 2 wave-2 complete — cached DESCRIBE end-to-end
  (#223 infra, #228 reads, #230 write tap, #231 discover warm-up,
  #232 state controls + --cache-ttl override)
- Arc 2 wave 3 complete — bytes_scanned / bytes_written on
  MaterializationOutput (#219 BQ, #221 Databricks, #220 Snowflake
  deferred doc, #222 docstring cascade). Real $ on rocky cost for
  BQ + Databricks
- FR-005 Unity Catalog workspace-binding reconcile (#226)
- FR-002 Fivetran connector metadata via SourceOutput.metadata (#225)
- Housekeeping: compute_backoff dedup into rocky_core::retry (#217)

Dagster headlines (4 PRs):
- FR-001 RockyComponent Pipes execution mode + FR-006 strict doctor
  on RockyResource startup (#224)
- FR-003 RockyResource.state_health() (#227) + FR follow-up threading
  doctor(check=state_rw) for sub-second probes (#229)
- RockyResource.cost() wiring + fixture (#218)

VS Code: regenerated TS bindings for engine 1.14.0 type additions.
No extension feature changes.

* chore(integrations/dagster): regenerate test fixtures for engine 1.14.0

36 fixtures picked up the new engine version string in their top-level
"version" field. No schema changes — just the version bump.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant