Skip to content

Unify cancellation and cleanup handling#296

Merged
chrisguidry merged 15 commits intomainfrom
fix-cancellation-handling
Jan 24, 2026
Merged

Unify cancellation and cleanup handling#296
chrisguidry merged 15 commits intomainfrom
fix-cancellation-handling

Conversation

@chrisguidry
Copy link
Owner

@chrisguidry chrisguidry commented Jan 23, 2026

Summary

Docket's asyncio cancellation and cleanup logic has grown organically across
several context managers, each with slightly different patterns for handling
shutdown, task cancellation, and exception swallowing. This made the codebase
harder to reason about and occasionally led to tasks that were difficult to
kill cleanly.

This PR unifies all the cleanup and cancellation handling around two patterns:

  1. AsyncExitStack for resource cleanup - All context managers (Docket,
    Worker, StrikeList, RedisConnection, ResultStorage, ConcurrencyLimit)
    now use AsyncExitStack with callbacks. This gives us LIFO cleanup order,
    consistent exception handling, and eliminates the scattered try/except/finally
    blocks with manual del statements and asyncio.shield() calls.

  2. Sentinel-based cancellation - A new _cancellation module provides
    cancel_task() which uses message sentinels (task.cancel(msg=...)) to
    distinguish internal cancellation (timeouts, cleanup) from external
    cancellation (signals, TaskGroup teardown). This fixes a bug where
    docket.cancel() didn't work for tasks with Timeout dependencies because
    all CancelledError was being converted to TimeoutError.

The worker loop now uses TaskGroup for structured concurrency of its
infrastructure tasks (scheduler, heartbeat, cancellation listener, lease
renewal). This means cleaner shutdown - when the main loop exits, the TaskGroup
automatically cancels and awaits all infrastructure tasks.

Other improvements

  • Better asyncio task names include the docket name (e.g., "mydocket - scheduler"
    instead of "docket.worker.scheduler") for easier debugging
  • Assertions prevent accidental re-entrant context manager usage
  • Fixed a race where perpetual tasks calling perpetual.cancel() would
    self-cancel via pub/sub before completing cleanup
  • Python 3.10/3.11 compatibility: cancel message propagation differs between
    versions, so cancel_task() handles both

🤖 Generated with Claude Code

chrisguidry and others added 9 commits January 23, 2026 09:08
This plan addresses several related issues with how docket handles asyncio
task cancellation:

- `docket.cancel()` doesn't work correctly for tasks with Timeout dependencies
  (CancelledError gets converted to TimeoutError)
- Shield + catch patterns in cleanup code swallow CancelledError, preventing
  proper propagation
- No way to distinguish "I cancelled this" from "someone cancelled me"

The plan proposes:
- Using `task.cancel(msg=...)` with sentinel messages to identify our own
  cancellations (works on Python 3.10+)
- Adopting `taskgroup` backport for worker internal tasks (scheduler, lease
  renewal, cancellation listener) to get proper structured concurrency
- Removing unnecessary shields from cleanup code
- Simplifying internal loop functions by letting TaskGroup handle cancellation

References:
- https://docs.python.org/3/library/asyncio-task.html
- python/cpython#103486
- https://github.com/graingert/taskgroup

Co-Authored-By: Claude Opus 4.5 <[email protected]>
…tion

When cancelling asyncio tasks, we need to distinguish between cancellations
we initiated (timeout, cleanup) vs external cancellations (shutdown, docket.cancel()).
This uses task.cancel(msg=...) to pass sentinel messages.

Changes:
- Add _cancellation.py with CANCEL_MSG_TIMEOUT, CANCEL_MSG_CLEANUP, and
  is_our_cancellation() helper
- worker.py: Use sentinel in _run_function_with_timeout (only convert OUR
  CancelledError to TimeoutError, propagate external cancellations)
- worker.py: Use sentinel for heartbeat and cancellation listener cleanup
- strikelist.py: Use sentinel for monitor task cleanup
- _concurrency.py: Use sentinel for lease renewal task cleanup
- Add regression test for cancelling running task with Timeout dependency

Co-Authored-By: Claude Opus 4.5 <[email protected]>
The shields were added to ensure cleanup completes even under cancellation, but
they swallow CancelledErrors which can mask issues. Cleanup code now uses plain
try/except Exception blocks, allowing CancelledError to propagate if cleanup is
interrupted.

Files changed:
- _redis.py: Remove shields from __aexit__ and _cluster_pubsub
- _result_store.py: Remove shield from __aexit__
- docket.py: Remove shields from __aexit__, add try/except around strike_list

Co-Authored-By: Claude Opus 4.5 <[email protected]>
These code paths handle external cancellation during cleanup operations.
They're defensive branches that are difficult to trigger in tests but ensure
we don't accidentally swallow external CancelledErrors.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Uses AsyncExitStack for managing cleanup of Redis connections, result
storage, and strike lists. This ensures all cleanups run even if one
fails, with exceptions properly propagated.

Changes:
- RedisConnection, ResultStorage, Docket use AsyncExitStack for cleanup
- Added AsyncCloseable protocol and close_resource helper in _redis.py
- Replaced if-checks with asserts for context manager entry validation
- Removed tests for suppressed cleanup errors (behavior changed to propagate)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Refactors the worker loop to use event-based synchronization instead of
cancellation for clean shutdown. Internal tasks (`_cancellation_listener`,
`_scheduler_loop`, `_renew_leases`) now check `_worker_stopping` periodically
and use interruptible waits so they respond quickly to shutdown signals.

Key changes:
- Replaced blocking `pubsub.listen()` iteration with polling via
  `pubsub.get_message(timeout=0.1)` in the cancellation listener
- Internal loops use `asyncio.wait_for(_worker_stopping.wait(), timeout=...)`
  for interruptible sleeps
- Manual task management with `asyncio.create_task()` instead of TaskGroup
  to preserve exception propagation behavior
- Finally block cancels unresponsive infra tasks before gathering

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Switches from manual asyncio.create_task() management to TaskGroup for
the worker's internal infrastructure tasks. This provides proper structured
concurrency where tasks are tied to a scope.

When exceptions occur in the main work loop, they're now wrapped in
ExceptionGroup (the modern Python 3.11+ pattern for concurrent exceptions).
Tests updated to check for the wrapped exception using ExceptionGroup.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
These paths handle race conditions during worker shutdown where
exceptions occur simultaneously with the stopping event. They're
defensive fallbacks that are difficult to test reliably.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Plan has been fully implemented.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@github-actions
Copy link

github-actions bot commented Jan 23, 2026

📚 Documentation has been built for this PR!

You can download the documentation directly here:
https://github.com/chrisguidry/docket/actions/runs/21307073224/artifacts/5241592663

@codecov-commenter
Copy link

codecov-commenter commented Jan 23, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 99.63%. Comparing base (c3167d5) to head (b9d436f).

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #296      +/-   ##
==========================================
- Coverage   99.63%   99.63%   -0.01%     
==========================================
  Files          95       96       +1     
  Lines        9598     9591       -7     
  Branches      466      459       -7     
==========================================
- Hits         9563     9556       -7     
  Misses         20       20              
  Partials       15       15              
Flag Coverage Δ
python-3.10 99.63% <100.00%> (-0.01%) ⬇️
python-3.11 98.32% <100.00%> (-0.10%) ⬇️
python-3.12 99.63% <100.00%> (-0.01%) ⬇️
python-3.13 99.63% <100.00%> (-0.01%) ⬇️
python-3.14 99.63% <100.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/docket/_cancellation.py 100.00% <100.00%> (ø)
src/docket/_redis.py 100.00% <100.00%> (ø)
src/docket/dependencies/_concurrency.py 100.00% <100.00%> (ø)
src/docket/docket.py 100.00% <100.00%> (ø)
src/docket/strikelist.py 100.00% <100.00%> (ø)
src/docket/worker.py 100.00% <100.00%> (ø)
tests/test_cancellation.py 100.00% <100.00%> (ø)
tests/test_docket_keys.py 100.00% <100.00%> (ø)
tests/test_metrics_counters.py 100.00% <100.00%> (ø)
tests/test_redelivery.py 100.00% <100.00%> (ø)
... and 3 more
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

chrisguidry and others added 2 commits January 23, 2026 15:04
The while loop always exits via return when the stopping event is set,
so the condition's exit branch is never taken.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Registers cleanup callbacks immediately where resources are initialized
rather than grouping them in __aexit__. This follows the pattern of
registering the delattr/setattr callback BEFORE the async cleanup so
LIFO ordering ensures attributes are cleared AFTER resources close.

- _redis.py: Uses setattr(..., None) for conditional resources (cluster
  vs standalone mode)
- _result_store.py: Uses delattr for pool/client, removes | None types
- docket.py: Uses del self._stack pattern
- worker.py: Uses delattr with correct LIFO ordering
- strikelist.py: Always creates _stack in __aenter__, removes hasattr
  check since __aexit__ without __aenter__ is invalid
- _cancellation.py: Adds cancel_task() helper and sentinel constants
  for distinguishing internal vs external cancellation

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@chrisguidry chrisguidry marked this pull request as ready for review January 23, 2026 21:32
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: d3d5360cc6

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 107 to 110
"""Connect to Redis when entering the context."""
if self.is_connected:
return self

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve RedisConnection re-entrancy across nested contexts

__aenter__ now returns immediately when is_connected is true, but __aexit__ always assumes _stack exists and deletes it. In a nested or repeated use (e.g., async with conn: inside another async with conn: or manual __aenter__/__aexit__ pairs), the inner exit closes the connection and deletes _stack, so the outer exit raises AttributeError and skips cleanup. This is a regression from the previously idempotent exit behavior; consider ref-counting or avoiding _stack teardown on re-entry.

Useful? React with 👍 / 👎.

Comment on lines 236 to 240
if self._redis is None:
return self # No Redis connection needed (local-only mode)

if self._redis.is_connected:
return self

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid losing StrikeList cleanup on re-entrant aenter

StrikeList.__aenter__ now creates a new AsyncExitStack and returns early if _redis is already connected. If the same StrikeList is re-entered while connected (nested context or another caller enters it first), that new stack has no cleanup callbacks, so __aexit__ won’t cancel _monitor_task or close Redis, leaving the monitor task and connection running. Consider reusing the existing stack or guarding against re-entry to preserve cleanup.

Useful? React with 👍 / 👎.

chrisguidry and others added 4 commits January 23, 2026 17:19
A few cleanup items from the PR review:

- Fix Python 3.10/3.11 cancellation handling: on 3.11+ we can check the
  cancel message and re-raise external cancellations; on 3.10 the message
  doesn't propagate so we have to accept the uncertainty
- Add try-finally to all __aexit__ methods so `del self._stack` runs even
  if the stack cleanup raises
- Use `cancel_task` directly as AsyncExitStack callbacks instead of
  wrapper methods (strikelist, worker, concurrency)
- Split cleanup callbacks so each does one thing (cancel task vs clear
  attributes)
- Move protocol definition right above the function that uses it

Also improved asyncio task naming to include the docket/strikelist name
for better debugging and stack traces (e.g., "mydocket - scheduler"
instead of "docket.worker.scheduler").

Co-Authored-By: Claude Opus 4.5 <[email protected]>
- _cancellation.py line 63: only hit on 3.11+ when message propagates
- worker.py line 1181: branch for task not found in cancellation handler

Co-Authored-By: Claude Opus 4.5 <[email protected]>
When a perpetual task called `perpetual.cancel()`, `_perpetuate_if_requested`
would call `docket.cancel(execution.key)` which publishes a pub/sub signal.
The cancellation listener (running in the same worker) would receive this
signal and cancel the very task that sent it - a self-cancellation race.

This showed up as flaky coverage: line 1047 (`return False`) was covered with
the memory backend but not with real Redis, because the in-memory pub/sub
doesn't give the listener time to process the message before the call returns.

The fix uses `_cancel()` directly for Redis cleanup without the pub/sub
signal. When a perpetual task says "don't reschedule me", we're completing
normally, not cancelling a running task - we ARE that task.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
RedisConnection and StrikeList were silently returning early if
__aenter__ was called twice, but __aexit__ would then try to clean
up resources from the wrong context. Instead of supporting re-entrancy,
we now assert to fail fast on misuse.

Sequential reuse (enter, exit, enter again) is still supported.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@chrisguidry chrisguidry changed the title Fix asyncio cancellation handling Unify cancellation and cleanup handling Jan 24, 2026
@chrisguidry chrisguidry merged commit 9c5cab5 into main Jan 24, 2026
40 checks passed
@chrisguidry chrisguidry deleted the fix-cancellation-handling branch January 24, 2026 01:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants