feat(engine): FR-004 rocky run --idempotency-key dedup (Phase 1+2+3)#235
Merged
hugocorreia90 merged 2 commits intomainfrom Apr 23, 2026
Merged
feat(engine): FR-004 rocky run --idempotency-key dedup (Phase 1+2+3)#235hugocorreia90 merged 2 commits intomainfrom
hugocorreia90 merged 2 commits intomainfrom
Conversation
Adds `rocky run --idempotency-key <KEY>` — caller-supplied opaque key that
dedups a run against prior runs with the same key. Three outcomes:
- Seen, succeeded → exit 0 with `status = "SkippedIdempotent"` and prior
`run_id` surfaced as `skipped_by_run_id`; no work done.
- Seen, in-flight → exit 0 with `status = "SkippedInFlight"`.
- Unseen (or prior InFlight past TTL — crashed-pod corpse) → proceed
normally, stamp terminal state at every exit.
Defence-in-depth below Dagster's `run_key`; catches pod retries, Kafka
re-delivery, webhook duplicates, cron races. Works for non-Dagster
callers (CI, cron, webhooks) as well.
All three phases of the implementation plan
(`~/Developer/rocky-plans/plans/rocky-fr-004-idempotency-key.md`) land
together:
- Phase 1 (Option A, local + valkey + tiered): `state.redb.lock` + redb
write txn on local; `SET NX EX` on Valkey / Tiered. `rocky-core`
gains an `idempotency` module with the backend dispatch and a per-
backend atomic claim primitive.
- Phase 2 (S3): `PutMode::Create` → `If-None-Match: "*"` via the
`object_store` crate. New `ObjectStoreProvider::put_if_not_exists`.
- Phase 3 (GCS): same mechanism, `x-goog-if-generation-match: 0`.
Design decisions locked per FR discussion:
- Verbatim key storage (debuggability > privacy; CLI help + Dagster
docstring warn against secrets in keys).
- `dedup_on = "success"` default; `"any"` optional. Failed runs under
`"success"` delete the entry so retries can reclaim.
- 30-day retention default, swept during the existing state-upload
cadence (no new cron). 24h `in_flight_ttl_hours` reaps crashed-pod
InFlight stamps via `AdoptStale`.
- In-flight lock via each backend's native primitive — one source of
truth per backend, no new services.
Schema surface (via `just codegen` cascade):
- `RunStatus` gains `SkippedIdempotent` + `SkippedInFlight` variants
(rippled to Pydantic + TS).
- `RunOutput` gains `status`, `skipped_by_run_id`, `idempotency_key`
fields.
- `[state.idempotency]` TOML block with `retention_days`, `dedup_on`,
`in_flight_ttl_hours`.
- `redb` schema version bumped 4 → 5 (new `IDEMPOTENCY_KEYS` table;
replicates by default, unlike `schema_cache`).
Guardrails:
- `--idempotency-key` + `--resume{,-latest}` is a clap-level error —
resume is an explicit override and must never be short-circuited.
- S3/GCS object path uses `sha256(key)` so caller key shape can't
break path safety; original key stored verbatim inside the payload.
- DAG sub-runs ignore the outer key to avoid sibling pipelines
short-circuiting on a single stamp.
Dagster: `run()` / `run_streaming()` / `run_pipes()` gain
`idempotency_key: str | None = None`. `_build_run_args` threads
`--idempotency-key` to the subprocess. Docstring carries the "no
secrets" warning.
POC: `examples/playground/pocs/05-orchestration/09-idempotency-key/`
demonstrates the flag acceptance + skip verdict. Matches the POC-04
pattern (DuckDB POCs exit non-zero on "no discovery adapter" —
orthogonal; the dedup primitive is validated independently).
Tests: 24 new tests across `rocky_core::idempotency::tests` (claim
dispatch, stale adoption, DedupPolicy branching, object-store path
stability) and `rocky_core::state::tests` (redb round-trips, sweep
semantics, concurrent-claim determinism). All 1046 rocky-core tests
pass; 374 dagster pytest tests pass; vscode tsc clean.
Known follow-up (F1 in plan §11): a `rocky run` that errors before
`persist_run_record()` leaves its InFlight entry until the 24h
`in_flight_ttl_hours` sweep reaps it. Worst-case latency for a
retried crashed run with the same key is 24h. Either extract
`run_inner()` or add a `tokio::spawn` drop guard on `IdempotencyCtx`
to tighten — dedicated follow-up PR.
`just regen-fixtures` captures the new `status` / `skipped_by_run_id` / `idempotency_key` fields on every `run` output fixture. All 13 run fixtures gain `"status": "Success"`; the two skip fields stay absent (serde skip_serializing_if on `None`). No behavioural change. Closes the `codegen-drift` CI failure on PR #235.
11 tasks
hugocorreia90
added a commit
that referenced
this pull request
Apr 23, 2026
Coordinated bundle since the last release (engine-v1.14.0 / dagster-v1.10.0 / vscode-v1.6.4, 2026-04-23 morning): - **#235 feat(engine): FR-004 rocky run --idempotency-key dedup (Phase 1+2+3)** — caller-supplied opaque key that dedups a run against prior runs via the state store's native atomic primitive (redb write-txn for local, Valkey SET NX EX for valkey/tiered, PutMode::Create for s3, If-Generation-Match for gcs). New [state.idempotency] TOML block (30d retention default, dedup_on="success", 24h in-flight TTL). RunStatus enum gains SkippedIdempotent/SkippedInFlight; RunOutput gains status/skipped_by_run_id/ idempotency_key fields. Dagster RockyResource.run*() gain idempotency_key kwarg. POC at examples/playground/pocs/05-orchestration/09-idempotency-key/. redb schema bump 4 -> 5 (additive, new IDEMPOTENCY_KEYS table). - **#234 chore(deps): bump rand from 0.8.5 to 0.8.6** — routine dependabot update. Version bumps: - Engine: 20 Cargo.toml (19 engine crates + rocky-lsp) 1.14.0 -> 1.15.0 - Dagster: pyproject.toml 1.10.0 -> 1.11.0 - VS Code: package.json 1.6.4 -> 1.7.0 Regenerated: - JSON schemas (schemas/*.schema.json) for new fields/enum variants - Pydantic types (integrations/dagster/src/dagster_rocky/types_generated/) - TS types (editors/vscode/src/types/generated/) - rocky-project.schema.json mirror (editors/vscode/schemas/) - Dagster test fixtures (tests/fixtures_generated/, 13 run fixtures gain status="Success") - vscode package-lock.json (version sync)
9 tasks
hugocorreia90
added a commit
that referenced
this pull request
Apr 23, 2026
…ation dispatches (FR-004 F2) (#239) Before this change, a `rocky run --idempotency-key K` against a Transformation / Quality / Snapshot / Load pipeline that returned `Ok(())` left its `InFlight` claim in place. The four non-replication dispatch arms delegated straight to `run_local::run_*` / `load::run_load` and returned without calling `finalize_idempotency`; the error-path wrapper shipped in #237 only fires on `is_err()`. A retry with the same key inside `in_flight_ttl_hours` (default 24h) then returned `skipped_in_flight` for up to 24h instead of `skipped_idempotent`. Each arm now stamps `Succeeded` on its happy-path exit via a new `finalize_idempotency_on_success` helper that mirrors `finalize_idempotency_on_error`: one-shot `.take()` on the shared `IdempotencyCtx`, opens its own `StateStore` from the canonical `state_path` owned by `run()`, best-effort on error. The one-shot semantics keep the outer error-path wrapper a no-op once the success path has drained the ctx. Replication was never affected — it already finalized correctly at its main exit in #235. Tested by driving `run()` end-to-end against a transformation pipeline with `--idempotency-key` set and asserting the redb entry is `Succeeded`, not `InFlight`. Pre-fix, the assertion fails with `InFlight != Succeeded`. Existing F1 coverage (`finalize_on_error_releases_inflight_claim_allowing_immediate_retry`, `finalize_on_error_is_one_shot_and_idempotent_when_ctx_already_taken`, `finalize_idempotency_is_one_shot_take`) continues to pass — the F1 wrapper remains safely idempotent against the new success-path finalize.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds
rocky run --idempotency-key <KEY>— caller-supplied opaque key that dedups a run against prior runs with the same key. All three phases of the implementation plan ship together so reviewers see the full mechanism in one surface:local+valkey+tiered): redb write txn insidestate.redb.lockon local; ValkeySET NX EXon{prefix}:idempotency:<key>on Valkey/Tiered.s3):object_store::PutMode::Create→If-None-Match: \"*\"on the S3PutObjectcall.gcs): same primitive,x-goog-if-generation-match: 0precondition.Three outcomes per invocation:
statusin JSONSucceeded/Failedat every terminal exitFailedunderdedup_on = any)SkippedIdempotentskipped_by_run_id= priorrun_idSkippedInFlightskipped_by_run_id= in-flightrun_idDefence-in-depth below Dagster's
run_key. Catches pod retries, Kafka re-delivery, webhook duplicates, cron races. Also usable from non-Dagster callers (CI, cron, webhooks, custom orchestrators).Design decisions locked
Per the plan in
~/Developer/rocky-plans/plans/rocky-fr-004-idempotency-key.md:dedup_on = \"success\"default (failed runs delete their entry so retries claim cleanly;\"any\"opts in to dedup on any terminal status).state_sync::upload_statecadence — no new cron.in_flight_ttl_hours— reader encountering a staleInFlightadopts viaAdoptStale.Schema cascade (
just codegen)RunStatus(rocky-core/src/state.rs) gainsSkippedIdempotent+SkippedInFlightvariants; rippled to Pydantic (run_schema.py) and TypeScript (run.ts).RunOutput(rocky-cli/src/output.rs) gainsstatus: RunStatus,skipped_by_run_id: Option<String>,idempotency_key: Option<String>.[state.idempotency]TOML block withretention_days,dedup_on,in_flight_ttl_hours.IDEMPOTENCY_KEYStable; NOT inLOCAL_ONLY_TABLE_NAMES— replicates across pods for the tiered backend).Guardrails
--idempotency-key+--resume{,-latest}is a hard error at flag parse — resume is an explicit override and must never be short-circuited.sha256(key)for safety + bounded length; original key stored verbatim inside the payload.Dagster
run()/run_streaming()/run_pipes()all gainidempotency_key: str | None = None._build_run_argsthreads--idempotency-keyto the subprocess. Docstring carries the secrets warning.Tests
IdempotencyCheckclassification,DedupPolicybranching, object-store path stability, backend label resolution.Successpolicy deletes, finalize-failed +Anypolicy keeps, finalize with mismatched run_id is a no-op, sweep removes past-retention + stale-InFlight entries, concurrent claim is deterministic.put_if_not_existsreturnsCreatedon first call,AlreadyExistson second.tsc --noEmitclean.cargo clippy --all-targets -- -D warningsclean.cargo fmt --checkclean.just codegenidempotent (no drift after rerun).POC
examples/playground/pocs/05-orchestration/09-idempotency-key/— self-contained DuckDB POC demonstrating the flag acceptance + short-circuit../run.shverifies:status = SkippedInFlight/SkippedIdempotent+ priorrun_idinskipped_by_run_id.Follows the POC-04 pattern (DuckDB POCs exit non-zero on "no discovery adapter" — orthogonal to the dedup primitive being validated).
Known follow-up
F1 in the plan §11 — today a run that errors before
persist_run_record()leaves its InFlight entry until the 24hin_flight_ttl_hourssweep reaps it. Worst-case latency for retrying a crashed run with the same key is 24h. Tighten by extractingrun_inner()(so the outerrun()can catch errors and finalize) or adding atokio::spawn-based drop guard onIdempotencyCtx. Dedicated follow-up PR.Test plan
cargo test -p rocky-core— 1046 passpytest integrations/dagster/tests/— 374 passexamples/playground/pocs/05-orchestration/09-idempotency-key/run.sh, confirm Run 2 short-circuits with a skip status and surfaces Run 1'srun_idReferences
~/Developer/rocky-plans/feature-requests/gold-fr-004-rocky-run-idempotency-key.md~/Developer/rocky-plans/plans/rocky-fr-004-idempotency-key.md~/Developer/rocky-plans/archive/backlog-reprioritization-2026-04-22.md