feat: distributed notification queue + BLPOP elicitation for background tasks#2906
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 73ed3aada9
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if progress > 0: | ||
| # Docket uses increment(), but we want absolute progress value | ||
| # Calculate delta from current to set absolute position | ||
| current = execution.progress.current or 0 | ||
| delta = int(progress) - current |
There was a problem hiding this comment.
Preserve fractional progress in background tasks
report_progress accepts floats, but the background-task path floors the value with int(progress) and only increments when delta > 0. If callers report fractional progress (e.g., 0.5 of 1.0), int(progress) becomes 0, so Redis progress stays at 0 until the next integer boundary; any consumer of execution.progress.current/total will see no gradual updates even though the foreground path supports fractional values. Consider scaling/rounding instead of truncating so fractional progress is reflected.
Useful? React with 👍 / 👎.
Test Failure AnalysisSummary: The Windows Python 3.10 test failed with a 5-second timeout in Root Cause: The timeout occurs during # oauth_proxy.py:822
key_value=DiskStore(directory=settings.home / "oauth-proxy")The stack trace shows the hang happens in Why This is NOT Related to the PR:
Evidence:
Detailed AnalysisStack Trace ExcerptWhat Changed in the PRThe PR adds fallback logic to
The new code path is only executed when:
None of these conditions are met in Related Files
Suggested Solution: This is a flaky Windows test issue, not a PR regression. Options:
The PR changes are sound and all related tests pass. The failure is coincidental timing with an unrelated infrastructure issue. |
WalkthroughThe 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
73ed3aa to
d44a623
Compare
d44a623 to
a4a7640
Compare
Test Failure AnalysisSummary: Type checker ( Root Cause: The PR changes elicitation response handling from polling to BLPOP/LPUSH for efficiency. However, the Redis client's type signatures require:
Suggested Solution: Wrap the single key in a list for Changes needed in
Detailed AnalysisThe static analysis workflow failed on the "Run prek" step with 4 type diagnostics: From the Redis Python client type signature: def blpop(
self, keys: List, timeout: Optional[Number] = 0
) -> Union[Awaitable[list], list]:The The second error: This suggests the type checker can't determine if Related Files
|
Test Failure AnalysisSummary: Type checker () is failing because expects a list of keys as the first argument, but the code is passing a single string. Root Cause: The PR changes the elicitation mechanism from polling with to blocking with . However, the method in redis-py requires its first argument to be a list of keys, not a single string: # Current code (line 149):
result = await redis.blpop(
docket.key(response_key), # ❌ Single string
timeout=max_wait_seconds,
)
# Should be:
result = await redis.blpop(
[docket.key(response_key)], # ✅ List of keys
timeout=max_wait_seconds,
)The type errors are:
Suggested Solution:
Detailed AnalysisThe redis-py library's def blpop(self, keys: List, timeout: Optional[Number] = 0) -> Union[Awaitable[list], list]BLPOP is designed to accept multiple keys and will pop from the first non-empty list. The single-key use case still requires wrapping the key in a list. The return type Type Errors Breakdown:src/fastmcp/server/tasks/elicitation.py:149:28 The type checker can't determine the return type because the arguments don't match the method signature. src/fastmcp/server/tasks/elicitation.py:150:17 The Related Files
References:
|
Test Failure AnalysisSummary: Static analysis (type checking) failed due to incorrect argument types for Redis Root Cause: The PR refactored from polling with
Suggested Solution: Fix 1: Wrap the key in a list for # Line 149-151
result = await redis.blpop(
[docket.key(response_key)], # Wrap in list
timeout=max_wait_seconds,
)Fix 2: Cast or handle the # Line 238-241
await redis.lpush(
docket.key(response_key),
json.dumps(response),
)The Detailed AnalysisType Checker OutputFrom the logs: redis-py SignaturesFrom the redis-py library:
The Related Files
|
3839a16 to
9a62dab
Compare
Test Failure AnalysisSummary: All tests are failing on import with Root Cause: The new file from datetime import UTC, datetimeHowever, Suggested Solution: Use Change needed in # Current (line 25):
from datetime import UTC, datetime
# Should be:
from datetime import datetime, timezoneThen update any usage of Detailed AnalysisError from all test jobs:The error occurs during test collection when importing
Python version compatibility:
Existing codebase pattern:The rest of the FastMCP codebase consistently uses
Related Files
Updated analysis based on latest workflow run (21670322009) - 2026-02-04 |
Test Failure AnalysisSummary: All Python 3.10 tests are failing due to Root Cause: The new file Suggested Solution: Replace In
Detailed AnalysisThe error appears in all Python 3.10 test jobs:
All failures show the same stack trace: The Python 3.13 tests pass because Python version compatibility:
Related FilesFile with issue:
Files using the correct pattern (examples):
All use Updated analysis based on workflow run 21672391648 - 2026-02-04 13:00 UTC |
10acb17 to
936fab8
Compare
|
hey team 👋 follow-up to #2905 — this adds distributed notification support so elicitation actually works for background tasks running on remote workers. ran a deep review loop and caught a few things worth mentioning: fixes included in this pr:
tests: 280 passed, 3 skipped. rebased on main. happy to walk through any of the implementation details if that would help. is there anything specific you'd like me to adjust before this gets merged? |
936fab8 to
5f70960
Compare
Test Failure AnalysisSummary: All Python 3.10 tests fail with Root Cause: The PR adds from datetime import UTC, datetimeHowever, Suggested Solution: Replace # In src/fastmcp/server/tasks/notifications.py line 25
from datetime import datetime, timezone
# Then use timezone.utc instead of UTC throughout the file
# Example: datetime.now(timezone.utc) instead of datetime.now(UTC)Detailed AnalysisAll failing jobs show the same import chain: The import happens at module load time, so all tests fail immediately before any test code runs. Failed jobs:
Only Python 3.13 tests passed (where Related Files
|
|
This is looking really compelling so far, @gfortaine, thank you! However, similar to the last PR, I'm having trouble with the style of the tests here. I think these heavily mocked tests are going to end up being difficult to maintain over time, because they are mocking many internal implementation details (including of redis itself). Take a look at the surrounding tests task-related tests and you'll see a very stark difference in style. Can we rework these tests to be simpler by just setting up example MCP servers with task support that make elicitations and then assert that the client sees those elicitations? We should be able to write tests that just "do the thing" because we have redis available (via the In our |
Address @chrisguidry's review feedback on PR PrefectHQ#2906: - Rewrite test_context_background_task.py (1303→345 lines): replace heavily mocked tests with Client(mcp) integration tests using memory:// Docket backend. Zero mocks for Redis, sessions, or Docket internals. - Rewrite test_notifications.py (626→104 lines): replace mock-heavy unit tests with 2 E2E integration tests that exercise the full notification pipeline through real Docket. - Fix Python 3.10 CI failure: replace datetime.UTC (3.11+) with datetime.timezone.utc in notifications.py. - Fix report_progress bug: ExecutionProgress has no set_current() method; use increment() with delta tracking instead. Test coverage: 18 tests, all passing in 2.4s.
Test Failure AnalysisSummary: Static type checking failed due to incorrect argument types for Redis blocking operations ( Root Cause: The Redis Suggested Solution: Wrap all Redis blocking operation keys in a list:
These changes will fix all the type errors because Detailed AnalysisType Errors FoundThe type checker (
All stem from the same root cause: the Redis client library type stubs indicate these methods accept Why This MattersFrom the Redis documentation:
The Python redis library reflects this by requiring a list, even for single-key operations. Related Files
Updated: 2026-02-07 (latest static analysis failure) |
Test Failure AnalysisSummary: Static type checking failed with 11 type errors in Redis blocking operations ( Root Cause:
Suggested Solution: Fix 1: Wrap Redis keys in lists # src/fastmcp/server/tasks/elicitation.py:175-178
result = await redis.blpop(
[docket.key(response_key)], # Wrap in list
timeout=max_wait_seconds,
)
# src/fastmcp/server/tasks/notifications.py:107-109
result = await redis.brpop(
[queue_key], # Wrap in list
timeout=SUBSCRIBER_TIMEOUT_SECONDS
)Fix 2: Add type ignores for lpush union types # src/fastmcp/server/tasks/elicitation.py:264
await redis.lpush( # type: ignore[misc]
docket.key(response_key),
json.dumps(response),
)
# src/fastmcp/server/tasks/notifications.py:70
await redis.lpush(key, message) # type: ignore[misc]
# src/fastmcp/server/tasks/notifications.py:132
await redis.lpush(queue_key, json.dumps(message)) # type: ignore[misc]Fix 3: Handle None values in test # tests/server/tasks/test_notifications.py:53-59
success = await handle_task_input(
task_id=captured["task_id"] or "", # Provide fallback
session_id=captured["session_id"] or "", # Provide fallback
action="accept",
content={"value": "hello"},
fastmcp=mcp,
)Detailed AnalysisType Errors from Workflow Run 21789768033The static analysis ( 1. Invalid await on blpop (elicitation.py:175)
2-3. Invalid argument types for blpop (elicitation.py:176)
4. Invalid await on lpush (elicitation.py:264)
5. Invalid await on lpush (notifications.py:70)
6-8. Invalid await/argument for brpop (notifications.py:107-108)
9. Invalid await on lpush (notifications.py:132)
10-11. Test argument type errors (test_notifications.py:54-55)
Why Redis Methods Require ListsBoth
Even for single-key use cases, the Python client requires a list to match the command signature. Why lpush Has Union TypesThe redis-py library supports both synchronous and async clients with a shared type hierarchy. This creates union return types like Related FilesFiles with type errors:
Reference:
Updated: 2026-02-08 01:00 UTC (Workflow run 21789768033) |
Add Redis-backed notification queue (LPUSH/BRPOP) enabling the MCP server to notify clients about background task events like elicitation requests. - notifications.py: subscriber management with weakref tracking, retry logic, TTL expiration, and graceful shutdown - __init__.py: export ensure_subscriber_running, push_notification, stop_subscriber
- context.py: report_progress uses delta tracking via increment() instead of set_current() (which doesn't exist), stores progress in Redis for background tasks - elicitation.py: replace polling with BLPOP for efficient blocking wait, fail-fast on notification push failure, use get_task_context() for authoritative session_id - handlers.py: subscriber cleanup on session disconnect via _exit_stack.push_async_callback()
Replace 1300+ lines of mock-heavy unit tests with 391 lines of integration tests using real Client(mcp) connections and memory:// Docket backend. - test_context_background_task.py: 17 tests covering report_progress delta tracking, elicitation flow, edge cases, and fail-fast on push failure - test_notifications.py: 2 E2E tests for notification queue lifecycle
b2ca066 to
1f84e50
Compare
🤖 Generated with Codex
|
@chrisguidry totally fair — mocking redis internals creates maintenance debt and the contrast with the surrounding task tests was stark. rewrote everything.
also switched to rebased on |
chrisguidry
left a comment
There was a problem hiding this comment.
@gfortaine It's a thing of beauty, thank you! Really appreciate the readability of the test now and that you fixed up the previous round of them too. Also great catch finding the SDK's correct notification types. When I originally pulled this all together, the SDK was in flux and only partially implemented too. I think things have definitely firmed up there since the original implementation.
|
|
||
| 1. Set task status to "input_required" via Redis | ||
| 2. Send notifications/tasks/updated with elicitation metadata | ||
| 2. Send notifications/tasks/status with elicitation metadata |
…fectHQ#3121) Merges two orthogonal PRs into a single coherent changeset: From PrefectHQ#2906 - Distributed notification queue & BLPOP elicitation: - Add notifications.py: LPUSH/BRPOP notification queue for background tasks - Replace polling-based elicitation with single BLPOP call - Fail-fast on notification push failure (return cancel immediately) - Add Docket ExecutionProgress API for report_progress() in background tasks - Wire notification subscriber lifecycle on session exit stack - Use typed TaskStatusNotification.model_validate() in handlers From PrefectHQ#3121 - Access token snapshot & lifespan_context fallback: - Snapshot AccessToken to Redis alongside task metadata - Add _task_access_token ContextVar restored on Docket worker entry - Extend get_access_token() with expiry-aware fallback chain - Add lifespan_context property fallback to server._lifespan_result Reviewer fixes applied to PrefectHQ#3121: - Replace silent except with logger.warning() - Use _current_docket before DocketDependency.docket.get() fallback - Hoist datetime import to module level - Type _access_token_cv_token as Token[...] | None instead of Any Tests: 27 new/consolidated tests (de-mocked), 264 total tasks/ tests pass.
…fectHQ#3121) Merges two orthogonal PRs into a single coherent changeset: From PrefectHQ#2906 - Distributed notification queue & BLPOP elicitation: - Add notifications.py: LPUSH/BRPOP notification queue for background tasks - Replace polling-based elicitation with single BLPOP call - Fail-fast on notification push failure (return cancel immediately) - Add Docket ExecutionProgress API for report_progress() in background tasks - Wire notification subscriber lifecycle on session exit stack - Use typed TaskStatusNotification.model_validate() in handlers From PrefectHQ#3121 - Access token snapshot & lifespan_context fallback: - Snapshot AccessToken to Redis alongside task metadata - Add _task_access_token ContextVar restored on Docket worker entry - Extend get_access_token() with expiry-aware fallback chain - Add lifespan_context property fallback to server._lifespan_result Reviewer fixes applied to PrefectHQ#3121: - Replace silent except with logger.warning() - Use _current_docket before DocketDependency.docket.get() fallback - Hoist datetime import to module level - Type _access_token_cv_token as Token[...] | None instead of Any Tests: 27 new/consolidated tests (de-mocked), 264 total tasks/ tests pass.
Summary
Implements reliable notification delivery and efficient elicitation for background tasks (SEP-1686).
Fixes #2904 (
statusMessagenot forwarded), addresses feedback from #2877 and #2568.Changes
notifications.py(new — 256 lines)Distributed notification queue using LPUSH/BRPOP pattern. Workers push to a Redis queue per session, MCP server process subscribes and forwards to the client. Retry up to 3x on delivery failure, TTL-based expiration for stale messages.
elicitation.pycancelimmediately instead of blocking for 1 hourget_task_context()(authoritative source) instead of session attributeTaskStatusNotification.model_validate()context.pyreport_progress()falls back to Docket'sExecutionProgresswhen noprogressToken(background tasks)_fastmcp_last_progress) withincrement()since Docket only exposes relative progresshandlers.pynotifications/tasks/statusnotification before queueing (typed, not raw JSONRPC)Tests (zero mocks)
test_context_background_task.py: 896 → 394 lines.Client(mcp)+memory://backend.test_notifications.py: 165 new lines.NotificationCaptureHandlerverifies E2E notification metadata.TestElicitFailFastpatchespush_notificationto test fail-fast behavior.Notes
tystatic analysis clean