Fix Perpetual rescheduling race condition#331
Conversation
Adds a test that demonstrates the race described in RACE.md: when a running Perpetual task is externally replaced and finishes after the replacement, its on_complete overwrites the correctly-timed successor with stale timing data. The test uses asyncio.Event to deterministically orchestrate two concurrent executions of the same key, proving that the last on_complete to fire wins unconditionally. Marked xfail until we implement a fix (likely a generation counter, per RACE.md Option A). Co-Authored-By: Claude Opus 4.6 <[email protected]>
When a running Perpetual task gets externally replaced (e.g. config change triggers `docket.replace()`), two executions of the same key run concurrently. Both call `Perpetual.on_complete()` → `docket.replace()` when they finish, and the last one to complete wins unconditionally — potentially overwriting a correctly-timed successor with stale timing data. The race looks like this: 1. Task A (interval=2s) is running 2. User changes config, calls docket.replace() for immediate re-execution 3. Task B (interval=500ms) starts and finishes first 4. B's on_complete schedules successor at now+500ms ✓ 5. A finishes, A's on_complete schedules successor at now+2s ✗ 6. The correctly-timed successor from B is gone The fix adds a generation counter to the scheduling system. Every call to `schedule()` atomically increments a generation field in the runs hash (via HINCRBY in the Lua script). Each stream message carries the generation it was scheduled with. This gives us two layers of protection: - **Head check** (worker, before claim): If a stale message arrives from the stream (redelivery after crash, pending message from a partition), the worker detects it's superseded and skips entirely — no claim, no function invocation, no state changes to the runs hash. - **Tail check** (Perpetual.on_complete, after execution): If a task was already running when the replacement happened, its on_complete detects supersession and skips the stale reschedule. The function body ran (unavoidable since it was mid-execution) but the damage is contained. Backwards compatible: old messages without generation get gen=0, HINCRBY on a missing field creates it at 0 then increments, so first new schedule is gen=1. Cron inherits the fix automatically through super().on_complete(). Co-Authored-By: Claude Opus 4.6 <[email protected]>
Co-Authored-By: Claude Opus 4.6 <[email protected]>
|
📚 Documentation has been built for this PR! You can download the documentation directly here: |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #331 +/- ##
==========================================
+ Coverage 98.65% 98.67% +0.02%
==========================================
Files 102 103 +1
Lines 10161 10323 +162
Branches 491 497 +6
==========================================
+ Hits 10024 10186 +162
Misses 121 121
Partials 16 16
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Co-Authored-By: Claude Opus 4.6 <[email protected]>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d57e40259c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Co-Authored-By: Claude Opus 4.6 <[email protected]>
…ments Generation 0 (the default when a message lacks the field) is now treated as "pre-tracking" and never considered superseded. This handles the case where an older worker's scheduler moves a task from queue to stream without passing through the generation field — the new worker would otherwise see gen=0 in the message vs gen=1 in the runs hash and incorrectly drop the task. Two new tests cover the upgrade paths: - Old→New: message with no generation field runs normally - New→Old scheduler→New worker: generation mismatch is tolerated Co-Authored-By: Claude Opus 4.6 <[email protected]>
…ce() The generation counter closes a broader gap than just Perpetual: when replace() runs XDEL on a stream message that a worker already read via XREADGROUP, the head check (before claim) skips the stale message. The test puts both gen=1 and gen=2 messages in the stream to simulate the XREADGROUP-before-XDEL race, and verifies only gen=2 executes. Co-Authored-By: Claude Opus 4.6 <[email protected]>
The generation check was a separate HGET before claim(), adding a Redis round-trip to every task execution. Since claim() already hits the same runs hash in a Lua script, the supersession check now happens atomically inside it — zero additional cost on the hot path. claim() returns False if the task was superseded, True if claimed. is_superseded() is kept for the tail check in Perpetual.on_complete(). Co-Authored-By: Claude Opus 4.6 <[email protected]>
Co-Authored-By: Claude Opus 4.6 <[email protected]>
Replace for/if/break loop with next() to avoid untaken branches that some Python versions flag as partial coverage. Co-Authored-By: Claude Opus 4.6 <[email protected]>
The supersession check now happens right after the strikelist check, before execution counts, timing, or any lifecycle metrics. This means superseded tasks don't affect any counters at all. A new TASKS_SUPERSEDED counter tracks how often this happens, with a docket.where label to distinguish the two check points: - "worker": head check in claim() before the task runs - "on_complete": tail check in Perpetual.on_complete() after it runs Co-Authored-By: Claude Opus 4.6 <[email protected]>
When a Perpetual task finishes, on_complete() calls docket.replace() to schedule the next run, which writes the successor's generation/state to the runs hash. Then _mark_as_terminal() unconditionally overwrites that with state=completed, or (with execution_ttl=0) deletes the entire hash. The successor becomes invisible. The fix uses a Lua script in _mark_as_terminal that atomically checks the generation counter before writing. If replace() already bumped the generation, the in-hand execution is superseded and the write is skipped. Progress cleanup and pub/sub notifications still fire unconditionally. For non-Perpetual tasks nothing changes — no replace() means the generation still matches and the write proceeds normally. Also parametrizes test_perpetual_race.py over execution_ttl to catch both the TTL=0 (hash deletion) and TTL>0 (state overwrite) variants, and adds a pub/sub event test for Perpetual completion. Relates to #331 Co-Authored-By: Claude Opus 4.6 <[email protected]>
When a Perpetual task finishes, `on_complete()` calls `docket.replace()` to schedule the next run, which writes the successor's generation/state to the runs hash. Then `_mark_as_terminal()` unconditionally overwrites that with `state=completed`, or (with `execution_ttl=0`) deletes the entire hash. The successor becomes invisible. The fix uses a Lua script in `_mark_as_terminal` that atomically checks the generation counter before writing. If `replace()` already bumped the generation, the in-hand execution is superseded and the write is skipped. Progress cleanup and pub/sub notifications still fire unconditionally. For non-Perpetual tasks nothing changes — no `replace()` means the generation still matches and the write proceeds normally. Also parametrizes `test_perpetual_race.py` over `execution_ttl` to catch both the TTL=0 (hash deletion) and TTL>0 (state overwrite) variants, and adds a pub/sub event test for Perpetual completion. Relates to #331 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <[email protected]>
The race
When a running Perpetual task gets externally replaced (e.g. a config change
triggers
docket.replace()), two executions of the same key run concurrently.Both call
Perpetual.on_complete()→docket.replace()when they finish, andthe last one to complete wins unconditionally — potentially overwriting a
correctly-timed successor with stale timing data.
Here's the timeline:
interval=2s) is runningdocket.replace()for immediate re-executioninterval=500ms) starts and finishes firston_completeschedules successor atnow+500ms✓on_completeschedules successor atnow+2s✗The first commit adds a test that reproduces this deterministically using
asyncio.Eventto control the ordering of completions.The fix: generation counter
Every call to
schedule()atomically increments agenerationfield in theruns hash (via
HINCRBYin the Lua script). Each stream message carries thegeneration it was scheduled with. This gives us two layers of protection:
Head check (worker
_execute, beforeclaim()): If a stale messagearrives from the stream (redelivery after crash, pending message from a
partition), the worker detects it's superseded and skips entirely — no
claim, no function invocation, no state corruption to the runs hash.
Tail check (
Perpetual.on_complete, after execution): If a task wasalready running when the replacement happened, its
on_completedetectssupersession and skips the stale reschedule. The function body ran
(unavoidable — it was mid-execution) but the damage is contained.
Cron inherits the fix automatically through
super().on_complete().Backwards compatibility
generationgetgen=0viafrom_message()defaultHINCRBYon a missing field creates it at 0 then increments → first new schedule is gen=1🤖 Generated with Claude Code