Unify cancellation and cleanup handling#296
Conversation
This plan addresses several related issues with how docket handles asyncio task cancellation: - `docket.cancel()` doesn't work correctly for tasks with Timeout dependencies (CancelledError gets converted to TimeoutError) - Shield + catch patterns in cleanup code swallow CancelledError, preventing proper propagation - No way to distinguish "I cancelled this" from "someone cancelled me" The plan proposes: - Using `task.cancel(msg=...)` with sentinel messages to identify our own cancellations (works on Python 3.10+) - Adopting `taskgroup` backport for worker internal tasks (scheduler, lease renewal, cancellation listener) to get proper structured concurrency - Removing unnecessary shields from cleanup code - Simplifying internal loop functions by letting TaskGroup handle cancellation References: - https://docs.python.org/3/library/asyncio-task.html - python/cpython#103486 - https://github.com/graingert/taskgroup Co-Authored-By: Claude Opus 4.5 <[email protected]>
…tion When cancelling asyncio tasks, we need to distinguish between cancellations we initiated (timeout, cleanup) vs external cancellations (shutdown, docket.cancel()). This uses task.cancel(msg=...) to pass sentinel messages. Changes: - Add _cancellation.py with CANCEL_MSG_TIMEOUT, CANCEL_MSG_CLEANUP, and is_our_cancellation() helper - worker.py: Use sentinel in _run_function_with_timeout (only convert OUR CancelledError to TimeoutError, propagate external cancellations) - worker.py: Use sentinel for heartbeat and cancellation listener cleanup - strikelist.py: Use sentinel for monitor task cleanup - _concurrency.py: Use sentinel for lease renewal task cleanup - Add regression test for cancelling running task with Timeout dependency Co-Authored-By: Claude Opus 4.5 <[email protected]>
The shields were added to ensure cleanup completes even under cancellation, but they swallow CancelledErrors which can mask issues. Cleanup code now uses plain try/except Exception blocks, allowing CancelledError to propagate if cleanup is interrupted. Files changed: - _redis.py: Remove shields from __aexit__ and _cluster_pubsub - _result_store.py: Remove shield from __aexit__ - docket.py: Remove shields from __aexit__, add try/except around strike_list Co-Authored-By: Claude Opus 4.5 <[email protected]>
These code paths handle external cancellation during cleanup operations. They're defensive branches that are difficult to trigger in tests but ensure we don't accidentally swallow external CancelledErrors. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Uses AsyncExitStack for managing cleanup of Redis connections, result storage, and strike lists. This ensures all cleanups run even if one fails, with exceptions properly propagated. Changes: - RedisConnection, ResultStorage, Docket use AsyncExitStack for cleanup - Added AsyncCloseable protocol and close_resource helper in _redis.py - Replaced if-checks with asserts for context manager entry validation - Removed tests for suppressed cleanup errors (behavior changed to propagate) Co-Authored-By: Claude Opus 4.5 <[email protected]>
Refactors the worker loop to use event-based synchronization instead of cancellation for clean shutdown. Internal tasks (`_cancellation_listener`, `_scheduler_loop`, `_renew_leases`) now check `_worker_stopping` periodically and use interruptible waits so they respond quickly to shutdown signals. Key changes: - Replaced blocking `pubsub.listen()` iteration with polling via `pubsub.get_message(timeout=0.1)` in the cancellation listener - Internal loops use `asyncio.wait_for(_worker_stopping.wait(), timeout=...)` for interruptible sleeps - Manual task management with `asyncio.create_task()` instead of TaskGroup to preserve exception propagation behavior - Finally block cancels unresponsive infra tasks before gathering Co-Authored-By: Claude Opus 4.5 <[email protected]>
Switches from manual asyncio.create_task() management to TaskGroup for the worker's internal infrastructure tasks. This provides proper structured concurrency where tasks are tied to a scope. When exceptions occur in the main work loop, they're now wrapped in ExceptionGroup (the modern Python 3.11+ pattern for concurrent exceptions). Tests updated to check for the wrapped exception using ExceptionGroup. Co-Authored-By: Claude Opus 4.5 <[email protected]>
These paths handle race conditions during worker shutdown where exceptions occur simultaneously with the stopping event. They're defensive fallbacks that are difficult to test reliably. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Plan has been fully implemented. Co-Authored-By: Claude Opus 4.5 <[email protected]>
|
📚 Documentation has been built for this PR! You can download the documentation directly here: |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #296 +/- ##
==========================================
- Coverage 99.63% 99.63% -0.01%
==========================================
Files 95 96 +1
Lines 9598 9591 -7
Branches 466 459 -7
==========================================
- Hits 9563 9556 -7
Misses 20 20
Partials 15 15
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
The while loop always exits via return when the stopping event is set, so the condition's exit branch is never taken. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Registers cleanup callbacks immediately where resources are initialized rather than grouping them in __aexit__. This follows the pattern of registering the delattr/setattr callback BEFORE the async cleanup so LIFO ordering ensures attributes are cleared AFTER resources close. - _redis.py: Uses setattr(..., None) for conditional resources (cluster vs standalone mode) - _result_store.py: Uses delattr for pool/client, removes | None types - docket.py: Uses del self._stack pattern - worker.py: Uses delattr with correct LIFO ordering - strikelist.py: Always creates _stack in __aenter__, removes hasattr check since __aexit__ without __aenter__ is invalid - _cancellation.py: Adds cancel_task() helper and sentinel constants for distinguishing internal vs external cancellation Co-Authored-By: Claude Opus 4.5 <[email protected]>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d3d5360cc6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| """Connect to Redis when entering the context.""" | ||
| if self.is_connected: | ||
| return self | ||
|
|
There was a problem hiding this comment.
Preserve RedisConnection re-entrancy across nested contexts
__aenter__ now returns immediately when is_connected is true, but __aexit__ always assumes _stack exists and deletes it. In a nested or repeated use (e.g., async with conn: inside another async with conn: or manual __aenter__/__aexit__ pairs), the inner exit closes the connection and deletes _stack, so the outer exit raises AttributeError and skips cleanup. This is a regression from the previously idempotent exit behavior; consider ref-counting or avoiding _stack teardown on re-entry.
Useful? React with 👍 / 👎.
src/docket/strikelist.py
Outdated
| if self._redis is None: | ||
| return self # No Redis connection needed (local-only mode) | ||
|
|
||
| if self._redis.is_connected: | ||
| return self |
There was a problem hiding this comment.
Avoid losing StrikeList cleanup on re-entrant aenter
StrikeList.__aenter__ now creates a new AsyncExitStack and returns early if _redis is already connected. If the same StrikeList is re-entered while connected (nested context or another caller enters it first), that new stack has no cleanup callbacks, so __aexit__ won’t cancel _monitor_task or close Redis, leaving the monitor task and connection running. Consider reusing the existing stack or guarding against re-entry to preserve cleanup.
Useful? React with 👍 / 👎.
A few cleanup items from the PR review: - Fix Python 3.10/3.11 cancellation handling: on 3.11+ we can check the cancel message and re-raise external cancellations; on 3.10 the message doesn't propagate so we have to accept the uncertainty - Add try-finally to all __aexit__ methods so `del self._stack` runs even if the stack cleanup raises - Use `cancel_task` directly as AsyncExitStack callbacks instead of wrapper methods (strikelist, worker, concurrency) - Split cleanup callbacks so each does one thing (cancel task vs clear attributes) - Move protocol definition right above the function that uses it Also improved asyncio task naming to include the docket/strikelist name for better debugging and stack traces (e.g., "mydocket - scheduler" instead of "docket.worker.scheduler"). Co-Authored-By: Claude Opus 4.5 <[email protected]>
- _cancellation.py line 63: only hit on 3.11+ when message propagates - worker.py line 1181: branch for task not found in cancellation handler Co-Authored-By: Claude Opus 4.5 <[email protected]>
When a perpetual task called `perpetual.cancel()`, `_perpetuate_if_requested` would call `docket.cancel(execution.key)` which publishes a pub/sub signal. The cancellation listener (running in the same worker) would receive this signal and cancel the very task that sent it - a self-cancellation race. This showed up as flaky coverage: line 1047 (`return False`) was covered with the memory backend but not with real Redis, because the in-memory pub/sub doesn't give the listener time to process the message before the call returns. The fix uses `_cancel()` directly for Redis cleanup without the pub/sub signal. When a perpetual task says "don't reschedule me", we're completing normally, not cancelling a running task - we ARE that task. Co-Authored-By: Claude Opus 4.5 <[email protected]>
RedisConnection and StrikeList were silently returning early if __aenter__ was called twice, but __aexit__ would then try to clean up resources from the wrong context. Instead of supporting re-entrancy, we now assert to fail fast on misuse. Sequential reuse (enter, exit, enter again) is still supported. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Summary
Docket's asyncio cancellation and cleanup logic has grown organically across
several context managers, each with slightly different patterns for handling
shutdown, task cancellation, and exception swallowing. This made the codebase
harder to reason about and occasionally led to tasks that were difficult to
kill cleanly.
This PR unifies all the cleanup and cancellation handling around two patterns:
AsyncExitStack for resource cleanup - All context managers (
Docket,Worker,StrikeList,RedisConnection,ResultStorage,ConcurrencyLimit)now use
AsyncExitStackwith callbacks. This gives us LIFO cleanup order,consistent exception handling, and eliminates the scattered
try/except/finallyblocks with manual
delstatements andasyncio.shield()calls.Sentinel-based cancellation - A new
_cancellationmodule providescancel_task()which uses message sentinels (task.cancel(msg=...)) todistinguish internal cancellation (timeouts, cleanup) from external
cancellation (signals, TaskGroup teardown). This fixes a bug where
docket.cancel()didn't work for tasks withTimeoutdependencies becauseall
CancelledErrorwas being converted toTimeoutError.The worker loop now uses
TaskGroupfor structured concurrency of itsinfrastructure tasks (scheduler, heartbeat, cancellation listener, lease
renewal). This means cleaner shutdown - when the main loop exits, the TaskGroup
automatically cancels and awaits all infrastructure tasks.
Other improvements
"mydocket - scheduler"instead of
"docket.worker.scheduler") for easier debuggingperpetual.cancel()wouldself-cancel via pub/sub before completing cleanup
versions, so
cancel_task()handles both🤖 Generated with Claude Code