Skip to content

feat(engine): FR-004 rocky run --idempotency-key dedup (Phase 1+2+3)#235

Merged
hugocorreia90 merged 2 commits intomainfrom
feat/fr-004-idempotency-key
Apr 23, 2026
Merged

feat(engine): FR-004 rocky run --idempotency-key dedup (Phase 1+2+3)#235
hugocorreia90 merged 2 commits intomainfrom
feat/fr-004-idempotency-key

Conversation

@hugocorreia90
Copy link
Copy Markdown
Contributor

Summary

Adds rocky run --idempotency-key <KEY> — caller-supplied opaque key that dedups a run against prior runs with the same key. All three phases of the implementation plan ship together so reviewers see the full mechanism in one surface:

  • Phase 1 (Option A — local + valkey + tiered): redb write txn inside state.redb.lock on local; Valkey SET NX EX on {prefix}:idempotency:<key> on Valkey/Tiered.
  • Phase 2 (s3): object_store::PutMode::CreateIf-None-Match: \"*\" on the S3 PutObject call.
  • Phase 3 (gcs): same primitive, x-goog-if-generation-match: 0 precondition.

Three outcomes per invocation:

Outcome status in JSON What happens
Unseen key (or stale InFlight corpse) proceeds normally Stamps Succeeded / Failed at every terminal exit
Seen + succeeded (or Failed under dedup_on = any) SkippedIdempotent Exit 0, no work, skipped_by_run_id = prior run_id
Seen + in-flight within TTL SkippedInFlight Exit 0, no work, skipped_by_run_id = in-flight run_id

Defence-in-depth below Dagster's run_key. Catches pod retries, Kafka re-delivery, webhook duplicates, cron races. Also usable from non-Dagster callers (CI, cron, webhooks, custom orchestrators).

Design decisions locked

Per the plan in ~/Developer/rocky-plans/plans/rocky-fr-004-idempotency-key.md:

  • Verbatim key storage (debuggability over privacy; CLI help + Dagster docstring carry the "no secrets in keys" warning).
  • dedup_on = \"success\" default (failed runs delete their entry so retries claim cleanly; \"any\" opts in to dedup on any terminal status).
  • 30-day retention default, swept during the existing state_sync::upload_state cadence — no new cron.
  • 24h in_flight_ttl_hours — reader encountering a stale InFlight adopts via AdoptStale.
  • In-flight lock via each backend's native primitive — one source of truth per backend, no new services.

Schema cascade (just codegen)

  • RunStatus (rocky-core/src/state.rs) gains SkippedIdempotent + SkippedInFlight variants; rippled to Pydantic (run_schema.py) and TypeScript (run.ts).
  • RunOutput (rocky-cli/src/output.rs) gains status: RunStatus, skipped_by_run_id: Option<String>, idempotency_key: Option<String>.
  • [state.idempotency] TOML block with retention_days, dedup_on, in_flight_ttl_hours.
  • redb schema version 4 → 5 (new IDEMPOTENCY_KEYS table; NOT in LOCAL_ONLY_TABLE_NAMES — replicates across pods for the tiered backend).

Guardrails

  • --idempotency-key + --resume{,-latest} is a hard error at flag parse — resume is an explicit override and must never be short-circuited.
  • S3/GCS object paths use sha256(key) for safety + bounded length; original key stored verbatim inside the payload.
  • DAG sub-runs ignore the outer key to avoid sibling pipelines short-circuiting on a single stamp.

Dagster

run() / run_streaming() / run_pipes() all gain idempotency_key: str | None = None. _build_run_args threads --idempotency-key to the subprocess. Docstring carries the secrets warning.

Tests

  • 1046 rocky-core tests pass, including 24 new tests covering:
    • Unit: IdempotencyCheck classification, DedupPolicy branching, object-store path stability, backend label resolution.
    • Integration (redb-backed): first claim proceeds, repeat succeeded skips, repeat in-flight-within-TTL skips, stale in-flight adopts, finalize-succeeded keeps entry, finalize-failed + Success policy deletes, finalize-failed + Any policy keeps, finalize with mismatched run_id is a no-op, sweep removes past-retention + stale-InFlight entries, concurrent claim is deterministic.
    • Object-store primitive: put_if_not_exists returns Created on first call, AlreadyExists on second.
  • 374 dagster pytest tests pass.
  • vscode tsc --noEmit clean.
  • cargo clippy --all-targets -- -D warnings clean.
  • cargo fmt --check clean.
  • just codegen idempotent (no drift after rerun).

POC

examples/playground/pocs/05-orchestration/09-idempotency-key/ — self-contained DuckDB POC demonstrating the flag acceptance + short-circuit. ./run.sh verifies:

  • Run 1 (fresh key) claims the entry.
  • Run 2 (same key) short-circuits with status = SkippedInFlight / SkippedIdempotent + prior run_id in skipped_by_run_id.
  • Run 3 (new key) claims independently.

Follows the POC-04 pattern (DuckDB POCs exit non-zero on "no discovery adapter" — orthogonal to the dedup primitive being validated).

Known follow-up

F1 in the plan §11 — today a run that errors before persist_run_record() leaves its InFlight entry until the 24h in_flight_ttl_hours sweep reaps it. Worst-case latency for retrying a crashed run with the same key is 24h. Tighten by extracting run_inner() (so the outer run() can catch errors and finalize) or adding a tokio::spawn-based drop guard on IdempotencyCtx. Dedicated follow-up PR.

Test plan

  • CI green (engine-ci, dagster-ci, vscode-ci, codegen-drift)
  • cargo test -p rocky-core — 1046 pass
  • pytest integrations/dagster/tests/ — 374 pass
  • Manual: run examples/playground/pocs/05-orchestration/09-idempotency-key/run.sh, confirm Run 2 short-circuits with a skip status and surfaces Run 1's run_id

References

Adds `rocky run --idempotency-key <KEY>` — caller-supplied opaque key that
dedups a run against prior runs with the same key. Three outcomes:

- Seen, succeeded → exit 0 with `status = "SkippedIdempotent"` and prior
  `run_id` surfaced as `skipped_by_run_id`; no work done.
- Seen, in-flight → exit 0 with `status = "SkippedInFlight"`.
- Unseen (or prior InFlight past TTL — crashed-pod corpse) → proceed
  normally, stamp terminal state at every exit.

Defence-in-depth below Dagster's `run_key`; catches pod retries, Kafka
re-delivery, webhook duplicates, cron races. Works for non-Dagster
callers (CI, cron, webhooks) as well.

All three phases of the implementation plan
(`~/Developer/rocky-plans/plans/rocky-fr-004-idempotency-key.md`) land
together:

- Phase 1 (Option A, local + valkey + tiered): `state.redb.lock` + redb
  write txn on local; `SET NX EX` on Valkey / Tiered. `rocky-core`
  gains an `idempotency` module with the backend dispatch and a per-
  backend atomic claim primitive.
- Phase 2 (S3): `PutMode::Create` → `If-None-Match: "*"` via the
  `object_store` crate. New `ObjectStoreProvider::put_if_not_exists`.
- Phase 3 (GCS): same mechanism, `x-goog-if-generation-match: 0`.

Design decisions locked per FR discussion:

- Verbatim key storage (debuggability > privacy; CLI help + Dagster
  docstring warn against secrets in keys).
- `dedup_on = "success"` default; `"any"` optional. Failed runs under
  `"success"` delete the entry so retries can reclaim.
- 30-day retention default, swept during the existing state-upload
  cadence (no new cron). 24h `in_flight_ttl_hours` reaps crashed-pod
  InFlight stamps via `AdoptStale`.
- In-flight lock via each backend's native primitive — one source of
  truth per backend, no new services.

Schema surface (via `just codegen` cascade):

- `RunStatus` gains `SkippedIdempotent` + `SkippedInFlight` variants
  (rippled to Pydantic + TS).
- `RunOutput` gains `status`, `skipped_by_run_id`, `idempotency_key`
  fields.
- `[state.idempotency]` TOML block with `retention_days`, `dedup_on`,
  `in_flight_ttl_hours`.
- `redb` schema version bumped 4 → 5 (new `IDEMPOTENCY_KEYS` table;
  replicates by default, unlike `schema_cache`).

Guardrails:

- `--idempotency-key` + `--resume{,-latest}` is a clap-level error —
  resume is an explicit override and must never be short-circuited.
- S3/GCS object path uses `sha256(key)` so caller key shape can't
  break path safety; original key stored verbatim inside the payload.
- DAG sub-runs ignore the outer key to avoid sibling pipelines
  short-circuiting on a single stamp.

Dagster: `run()` / `run_streaming()` / `run_pipes()` gain
`idempotency_key: str | None = None`. `_build_run_args` threads
`--idempotency-key` to the subprocess. Docstring carries the "no
secrets" warning.

POC: `examples/playground/pocs/05-orchestration/09-idempotency-key/`
demonstrates the flag acceptance + skip verdict. Matches the POC-04
pattern (DuckDB POCs exit non-zero on "no discovery adapter" —
orthogonal; the dedup primitive is validated independently).

Tests: 24 new tests across `rocky_core::idempotency::tests` (claim
dispatch, stale adoption, DedupPolicy branching, object-store path
stability) and `rocky_core::state::tests` (redb round-trips, sweep
semantics, concurrent-claim determinism). All 1046 rocky-core tests
pass; 374 dagster pytest tests pass; vscode tsc clean.

Known follow-up (F1 in plan §11): a `rocky run` that errors before
`persist_run_record()` leaves its InFlight entry until the 24h
`in_flight_ttl_hours` sweep reaps it. Worst-case latency for a
retried crashed run with the same key is 24h. Either extract
`run_inner()` or add a `tokio::spawn` drop guard on `IdempotencyCtx`
to tighten — dedicated follow-up PR.
`just regen-fixtures` captures the new `status` / `skipped_by_run_id` /
`idempotency_key` fields on every `run` output fixture. All 13 run
fixtures gain `"status": "Success"`; the two skip fields stay absent
(serde skip_serializing_if on `None`). No behavioural change.

Closes the `codegen-drift` CI failure on PR #235.
@hugocorreia90 hugocorreia90 merged commit 61ca164 into main Apr 23, 2026
16 checks passed
@hugocorreia90 hugocorreia90 deleted the feat/fr-004-idempotency-key branch April 23, 2026 10:43
hugocorreia90 added a commit that referenced this pull request Apr 23, 2026
Coordinated bundle since the last release (engine-v1.14.0 / dagster-v1.10.0
/ vscode-v1.6.4, 2026-04-23 morning):

- **#235 feat(engine): FR-004 rocky run --idempotency-key dedup (Phase 1+2+3)**
  — caller-supplied opaque key that dedups a run against prior runs via the
  state store's native atomic primitive (redb write-txn for local, Valkey
  SET NX EX for valkey/tiered, PutMode::Create for s3, If-Generation-Match
  for gcs). New [state.idempotency] TOML block (30d retention default,
  dedup_on="success", 24h in-flight TTL). RunStatus enum gains
  SkippedIdempotent/SkippedInFlight; RunOutput gains status/skipped_by_run_id/
  idempotency_key fields. Dagster RockyResource.run*() gain idempotency_key
  kwarg. POC at examples/playground/pocs/05-orchestration/09-idempotency-key/.
  redb schema bump 4 -> 5 (additive, new IDEMPOTENCY_KEYS table).

- **#234 chore(deps): bump rand from 0.8.5 to 0.8.6** — routine dependabot
  update.

Version bumps:
- Engine: 20 Cargo.toml (19 engine crates + rocky-lsp) 1.14.0 -> 1.15.0
- Dagster: pyproject.toml 1.10.0 -> 1.11.0
- VS Code: package.json 1.6.4 -> 1.7.0

Regenerated:
- JSON schemas (schemas/*.schema.json) for new fields/enum variants
- Pydantic types (integrations/dagster/src/dagster_rocky/types_generated/)
- TS types (editors/vscode/src/types/generated/)
- rocky-project.schema.json mirror (editors/vscode/schemas/)
- Dagster test fixtures (tests/fixtures_generated/, 13 run fixtures gain
  status="Success")
- vscode package-lock.json (version sync)
hugocorreia90 added a commit that referenced this pull request Apr 23, 2026
…ation dispatches (FR-004 F2) (#239)

Before this change, a `rocky run --idempotency-key K` against a
Transformation / Quality / Snapshot / Load pipeline that returned
`Ok(())` left its `InFlight` claim in place. The four non-replication
dispatch arms delegated straight to `run_local::run_*` / `load::run_load`
and returned without calling `finalize_idempotency`; the error-path
wrapper shipped in #237 only fires on `is_err()`. A retry with the same
key inside `in_flight_ttl_hours` (default 24h) then returned
`skipped_in_flight` for up to 24h instead of `skipped_idempotent`.

Each arm now stamps `Succeeded` on its happy-path exit via a new
`finalize_idempotency_on_success` helper that mirrors
`finalize_idempotency_on_error`: one-shot `.take()` on the shared
`IdempotencyCtx`, opens its own `StateStore` from the canonical
`state_path` owned by `run()`, best-effort on error. The one-shot
semantics keep the outer error-path wrapper a no-op once the success
path has drained the ctx.

Replication was never affected — it already finalized correctly at its
main exit in #235.

Tested by driving `run()` end-to-end against a transformation pipeline
with `--idempotency-key` set and asserting the redb entry is
`Succeeded`, not `InFlight`. Pre-fix, the assertion fails with
`InFlight != Succeeded`. Existing F1 coverage
(`finalize_on_error_releases_inflight_claim_allowing_immediate_retry`,
`finalize_on_error_is_one_shot_and_idempotent_when_ctx_already_taken`,
`finalize_idempotency_is_one_shot_take`) continues to pass — the F1
wrapper remains safely idempotent against the new success-path
finalize.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant