Skip to content

feat: distributed notification queue + BLPOP elicitation for background tasks#2906

Merged
chrisguidry merged 6 commits intoPrefectHQ:mainfrom
gfortaine:fix/statusMessage-not-forwarded
Feb 10, 2026
Merged

feat: distributed notification queue + BLPOP elicitation for background tasks#2906
chrisguidry merged 6 commits intoPrefectHQ:mainfrom
gfortaine:fix/statusMessage-not-forwarded

Conversation

@gfortaine
Copy link
Copy Markdown
Contributor

@gfortaine gfortaine commented Jan 18, 2026

Summary

Implements reliable notification delivery and efficient elicitation for background tasks (SEP-1686).

Fixes #2904 (statusMessage not forwarded), addresses feedback from #2877 and #2568.

Changes

notifications.py (new — 256 lines)

Distributed notification queue using LPUSH/BRPOP pattern. Workers push to a Redis queue per session, MCP server process subscribes and forwards to the client. Retry up to 3x on delivery failure, TTL-based expiration for stale messages.

elicitation.py

  • Replaced polling (7,200 round-trips/hour) with BLPOP (single blocking call)
  • Fail-fast: if notification push fails, return cancel immediately instead of blocking for 1 hour
  • Session ID from get_task_context() (authoritative source) instead of session attribute
  • Typed notifications via TaskStatusNotification.model_validate()

context.py

  • report_progress() falls back to Docket's ExecutionProgress when no progressToken (background tasks)
  • Uses delta tracking (_fastmcp_last_progress) with increment() since Docket only exposes relative progress

handlers.py

  • Initial notifications/tasks/status notification before queueing (typed, not raw JSONRPC)
  • Subscriber lifecycle management: starts on first task, cleans up on session disconnect

Tests (zero mocks)

  • test_context_background_task.py: 896 → 394 lines. Client(mcp) + memory:// backend.
  • test_notifications.py: 165 new lines. NotificationCaptureHandler verifies E2E notification metadata.
  • Only surgical mock: TestElicitFailFast patches push_notification to test fail-fast behavior.

Notes

  • Rebased on v3.0.0b2 (4 commits)
  • All 256 task tests pass, ty static analysis clean
  • Backward compatible — foreground progress behavior unchanged

@marvin-context-protocol marvin-context-protocol Bot added bug Something isn't working. Reports of errors, unexpected behavior, or broken functionality. server Related to FastMCP server implementation or server-side functionality. labels Jan 18, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 73ed3aada9

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/fastmcp/server/context.py Outdated
Comment on lines +363 to +367
if progress > 0:
# Docket uses increment(), but we want absolute progress value
# Calculate delta from current to set absolute position
current = execution.progress.current or 0
delta = int(progress) - current
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve fractional progress in background tasks

report_progress accepts floats, but the background-task path floors the value with int(progress) and only increments when delta > 0. If callers report fractional progress (e.g., 0.5 of 1.0), int(progress) becomes 0, so Redis progress stays at 0 until the next integer boundary; any consumer of execution.progress.current/total will see no gradual updates even though the foreground path supports fractional values. Consider scaling/rounding instead of truncating so fractional progress is reflected.

Useful? React with 👍 / 👎.

@marvin-context-protocol
Copy link
Copy Markdown
Contributor

Test Failure Analysis

Summary: The Windows Python 3.10 test failed with a 5-second timeout in test_init_with_explicit_params from test_workos.py. This appears to be an unrelated, pre-existing Windows-specific issue with SQLite disk access, not caused by the PR changes.

Root Cause: The timeout occurs during WorkOSProvider initialization when creating a DiskStore object, specifically at:

# oauth_proxy.py:822
key_value=DiskStore(directory=settings.home / "oauth-proxy")

The stack trace shows the hang happens in sqlite3.connect() within the diskcache library. This is a known Windows issue where SQLite can timeout due to file locking or disk access delays.

Why This is NOT Related to the PR:

  1. The PR only modifies Context.report_progress() in context.py
  2. The failing test (test_init_with_explicit_params) never calls report_progress() - it only creates a provider object
  3. The import added by the PR (from docket.dependencies import Dependency) is inside the report_progress method and only executed when the method is called
  4. All other test suites passed, including Ubuntu Python 3.10, 3.13, integration tests, and lowest-direct dependencies

Evidence:

  • Only Windows Python 3.10 failed; Ubuntu Python 3.10/3.13 passed ✅
  • The timeout is in provider initialization (DiskStore/SQLite), not progress reporting
  • The failing test doesn't exercise any code path modified by this PR
Detailed Analysis

Stack Trace Excerpt

File "oauth_proxy.py", line 822, in __init__
    key_value=DiskStore(directory=settings.home / "oauth-proxy"),
File "diskcache/core.py", line 623, in _con
    con = self._local.con = sqlite3.connect(
+++++++++++++++++++++++++++++++++++ Timeout +++++++++++++++++++++++++++++++++++

What Changed in the PR

The PR adds fallback logic to Context.report_progress():

  • If progressToken exists: send MCP notification (existing behavior)
  • If no progressToken: update Docket execution progress in Redis (new behavior)

The new code path is only executed when:

  1. report_progress() is called
  2. There's no progressToken
  3. Docket is available

None of these conditions are met in test_init_with_explicit_params.

Related Files
  • Modified: src/fastmcp/server/context.py - Added background task progress tracking
  • Failing test: tests/server/auth/providers/test_workos.py:17 - Provider initialization
  • Timeout location: src/fastmcp/server/auth/oauth_proxy.py:822 - DiskStore creation

Suggested Solution:

This is a flaky Windows test issue, not a PR regression. Options:

  1. Re-run the Windows tests to see if it passes on retry (recommended)
  2. Investigate the Windows SQLite timeout separately as a flaky test issue
  3. Consider adding a longer timeout or mark as flaky specifically for Windows

The PR changes are sound and all related tests pass. The failure is coincidental timing with an unrelated infrastructure issue.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jan 18, 2026

Walkthrough

The report_progress method in the Context class has been updated to support an optional message parameter and implement dual-context progress reporting. When a progress_token is available, it sends MCP foreground progress notifications. Otherwise, it attempts to update Docket background progress in Redis by retrieving the current Execution, computing progress deltas, and propagating messages. The implementation includes graceful error handling for unavailable Docket contexts and conditional background progress handling that depends on Docket availability and worker context.

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Title check ⚠️ Warning The PR title references distributed notification queue and BLPOP, but the actual changes focus on fixing report_progress for background tasks and adding a message parameter. Update title to reflect the main change, such as 'fix: report_progress stores messages and progress in Redis for background tasks' or similar.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed The PR successfully addresses issue #2904 by implementing dual-context handling in Context.report_progress: foreground MCP progress notifications when progressToken exists, and background Docket progress storage via Redis when running in worker context.
Out of Scope Changes check ✅ Passed All changes are focused and in-scope: modifications to Context.report_progress method signature and implementation to handle background task progress storage, with no unrelated alterations.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Description check ✅ Passed The description provides comprehensive details about the changes, objectives, and testing, but is missing the required contributors/review checklists from the template.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@marvin-context-protocol
Copy link
Copy Markdown
Contributor

Test Failure Analysis

Summary: Type checker (ty) is failing because redis.blpop() expects a list of keys but is being passed a single string, and redis.lpush() has ambiguous return type handling.

Root Cause: The PR changes elicitation response handling from polling to BLPOP/LPUSH for efficiency. However, the Redis client's type signatures require:

  1. blpop() expects keys: List (multiple keys to check), not a single string
  2. lpush() may return different types (sync/async) depending on the Redis client variant, causing type ambiguity

Suggested Solution: Wrap the single key in a list for blpop() and ensure proper await handling for lpush().

Changes needed in src/fastmcp/server/tasks/elicitation.py:

  1. Line 149-152: Change blpop() to accept a list:

    result = await redis.blpop(
        [docket.key(response_key)],  # Wrap in list
        timeout=max_wait_seconds,
    )
  2. Line 230-233: Ensure lpush() result is properly awaited (likely already correct, but type checker needs clarity):

    await redis.lpush(
        docket.key(response_key),
        json.dumps(response),
    )

    If the issue persists, may need to cast or annotate the redis client type more explicitly.

Detailed Analysis

The static analysis workflow failed on the "Run prek" step with 4 type diagnostics:

error[invalid-argument-type]: Argument to bound method `blpop` is incorrect
   --> src/fastmcp/server/tasks/elicitation.py:150:17
    |
150 |                 docket.key(response_key),
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^ Expected `list[Unknown]`, found `Unknown | str`

From the Redis Python client type signature:

def blpop(
    self, keys: List, timeout: Optional[Number] = 0
) -> Union[Awaitable[list], list]:

The blpop command is designed to check multiple keys and return when any of them has data. Even when checking a single key, it must be passed as a list.

The second error:

error[invalid-await]: `Unknown | Awaitable[int] | int` is not awaitable
   --> src/fastmcp/server/tasks/elicitation.py:230:15

This suggests the type checker can't determine if lpush returns an awaitable. This is likely due to the union type from Redis/RedisCluster compatibility.

Related Files
  • src/fastmcp/server/tasks/elicitation.py:149-152 - blpop() call needs list wrapper
  • src/fastmcp/server/tasks/elicitation.py:230-233 - lpush() call with potential type ambiguity

@marvin-context-protocol
Copy link
Copy Markdown
Contributor

Test Failure Analysis

Summary: Type checker () is failing because expects a list of keys as the first argument, but the code is passing a single string.

Root Cause: The PR changes the elicitation mechanism from polling with to blocking with . However, the method in redis-py requires its first argument to be a list of keys, not a single string:

# Current code (line 149):
result = await redis.blpop(
    docket.key(response_key),  # ❌ Single string
    timeout=max_wait_seconds,
)

# Should be:
result = await redis.blpop(
    [docket.key(response_key)],  # ✅ List of keys
    timeout=max_wait_seconds,
)

The type errors are:

  1. Line 149: invalid-await - Type checker sees return type as ambiguous because it can't match the call signature
  2. Lines 150: invalid-argument-type - Expected list[Unknown], found Unknown | str
  3. Line 238: invalid-await - Similar issue with redis.lpush() which also needs proper type hints

Suggested Solution:

  1. Fix the blpop call by wrapping the key in a list:

    result = await redis.blpop(
        [docket.key(response_key)],
        timeout=max_wait_seconds,
    )
  2. Fix the lpush call similarly if needed (line 238) - check if lpush also requires a list

Detailed Analysis

The redis-py library's blpop() signature is:

def blpop(self, keys: List, timeout: Optional[Number] = 0) -> Union[Awaitable[list], list]

BLPOP is designed to accept multiple keys and will pop from the first non-empty list. The single-key use case still requires wrapping the key in a list.

The return type Union[Awaitable[list], list] indicates redis-py supports both sync and async modes, which is why the await is correct, but the type checker can't determine this when the argument types don't match.

Type Errors Breakdown:

src/fastmcp/server/tasks/elicitation.py:149:28

error[invalid-await]: `Unknown | Awaitable[list[Unknown]] | list[Unknown]` is not awaitable

The type checker can't determine the return type because the arguments don't match the method signature.

src/fastmcp/server/tasks/elicitation.py:150:17

error[invalid-argument-type]: Expected `list[Unknown]`, found `Unknown | str`

The docket.key(response_key) returns a string, but blpop expects a list of strings.

Related Files
  • src/fastmcp/server/tasks/elicitation.py:149-152 - blpop call needs list argument
  • src/fastmcp/server/tasks/elicitation.py:238-241 - lpush call may need similar fix

References:

@marvin-context-protocol
Copy link
Copy Markdown
Contributor

Test Failure Analysis

Summary: Static analysis (type checking) failed due to incorrect argument types for Redis blpop and lpush methods in the elicitation refactoring.

Root Cause: The PR refactored from polling with redis.get()/redis.set() to blocking operations with redis.blpop()/redis.lpush(), but the argument types don't match the redis-py signatures:

  1. Line 150: redis.blpop() expects keys: List but receives a single string from docket.key(response_key)
  2. Line 238: redis.lpush() isn't being properly awaited according to the type checker (union type issue)

Suggested Solution:

Fix 1: Wrap the key in a list for blpop:

# Line 149-151
result = await redis.blpop(
    [docket.key(response_key)],  # Wrap in list
    timeout=max_wait_seconds,
)

Fix 2: Cast or handle the lpush return type properly:

# Line 238-241
await redis.lpush(
    docket.key(response_key),
    json.dumps(response),
)

The lpush issue might be a false positive from the type checker due to the union type Unknown | Awaitable[int] | int. Since you're already using await, this should work at runtime, but you might need to add a type ignore comment if the type checker continues to complain after fixing the blpop issue.

Detailed Analysis

Type Checker Output

From the logs:

error[invalid-argument-type]: Argument to bound method `blpop` is incorrect
   --> src/fastmcp/server/tasks/elicitation.py:150:17
    |
150 |                 docket.key(response_key),
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^ Expected `list[Unknown]`, found `Unknown | str`
error[invalid-await]: `Unknown | Awaitable[int] | int` is not awaitable
   --> src/fastmcp/server/tasks/elicitation.py:238:15
    |
238 |           await redis.lpush(
    |  _______________^
239 | |             docket.key(response_key),
240 | |             json.dumps(response),
241 | |         )
    | |_________^

redis-py Signatures

From the redis-py library:

  • blpop(self, keys: List, timeout: Union[int, float, NoneType] = 0) -> Union[Awaitable[list], list]
  • lpush(self, name: Union[bytes, str, memoryview], *values: Union[bytes, bytearray, memoryview, str, int, float]) -> Union[Awaitable[int], int]

The blpop method explicitly requires a List for the keys parameter, supporting multiple keys to check in order. Even when blocking on a single key, it must be wrapped in a list.

Related Files
  • src/fastmcp/server/tasks/elicitation.py:150 - blpop call needs list wrapper
  • src/fastmcp/server/tasks/elicitation.py:238 - lpush call has type checker issue

@gfortaine gfortaine force-pushed the fix/statusMessage-not-forwarded branch from 3839a16 to 9a62dab Compare February 3, 2026 19:37
@marvin-context-protocol
Copy link
Copy Markdown
Contributor

marvin-context-protocol Bot commented Feb 3, 2026

Test Failure Analysis

Summary: All tests are failing on import with ImportError: cannot import name 'UTC' from 'datetime' in the newly added notifications.py file.

Root Cause: The new file src/fastmcp/server/tasks/notifications.py imports UTC from the datetime module at line 25:

from datetime import UTC, datetime

However, datetime.UTC was only added in Python 3.11. FastMCP supports Python ≥3.10 (as stated in CLAUDE.md and pyproject.toml), and the CI is testing on Python 3.10.19, causing an immediate import error that prevents any tests from running.

Suggested Solution: Use timezone.utc instead, which is the Python 3.10-compatible approach already used throughout the codebase.

Change needed in src/fastmcp/server/tasks/notifications.py:25:

# Current (line 25):
from datetime import UTC, datetime

# Should be:
from datetime import datetime, timezone

Then update any usage of UTC in the file to timezone.utc.

Detailed Analysis

Error from all test jobs:

ImportError: cannot import name 'UTC' from 'datetime' 
(/home/runner/work/_temp/uv-python-dir/cpython-3.10.19-linux-x86_64-gnu/lib/python3.10/datetime.py)

The error occurs during test collection when importing conftest.py, which triggers the import chain:

  1. fastmcpfastmcp.server.serverfastmcp.server.context
  2. fastmcp.resources.resourcefastmcp.server.tasks.config
  3. fastmcp.server.tasks.__init__fastmcp.server.tasks.notifications ← fails here

Python version compatibility:

  • datetime.UTC: Added in Python 3.11 (PEP 615)
  • datetime.timezone.utc: Available since Python 3.2

Existing codebase pattern:

The rest of the FastMCP codebase consistently uses timezone.utc:

  • src/fastmcp/client/tasks.py:181: datetime.now(timezone.utc)
  • src/fastmcp/server/tasks/handlers.py:62: datetime.now(timezone.utc)
  • src/fastmcp/server/tasks/requests.py:207-215: Multiple uses of timezone.utc
  • src/fastmcp/server/middleware/middleware.py:60: datetime.now(timezone.utc)
Related Files
  • src/fastmcp/server/tasks/notifications.py:25 - New file added in this PR with incompatible import
  • All other src/fastmcp/server/tasks/*.py files use timezone.utc correctly

Updated analysis based on latest workflow run (21670322009) - 2026-02-04

@marvin-context-protocol
Copy link
Copy Markdown
Contributor

marvin-context-protocol Bot commented Feb 4, 2026

Test Failure Analysis

Summary: All Python 3.10 tests are failing due to ImportError: cannot import name 'UTC' in the newly added notifications.py file.

Root Cause: The new file src/fastmcp/server/tasks/notifications.py uses from datetime import UTC, datetime at line 25. However, datetime.UTC was only added in Python 3.11, while FastMCP supports Python ≥3.10.

Suggested Solution: Replace UTC with timezone.utc to maintain Python 3.10 compatibility. This is the pattern used consistently throughout the codebase.

In src/fastmcp/server/tasks/notifications.py:

  • Change line 25 from: from datetime import UTC, datetime
  • To: from datetime import datetime, timezone
  • Change line 66 from: "enqueued_at": datetime.now(UTC).isoformat(),
  • To: "enqueued_at": datetime.now(timezone.utc).isoformat(),
Detailed Analysis

The error appears in all Python 3.10 test jobs:

  • Tests: Python 3.10 on ubuntu-latest ❌
  • Tests: Python 3.10 on windows-latest ❌
  • Tests with lowest-direct dependencies ❌
  • Integration tests ❌

All failures show the same stack trace:

src/fastmcp/server/tasks/__init__.py:14: in <module>
    from fastmcp.server.tasks.notifications import (
src/fastmcp/server/tasks/notifications.py:25: in <module>
    from datetime import UTC, datetime
E   ImportError: cannot import name 'UTC' from 'datetime'

The Python 3.13 tests pass because datetime.UTC is available in that version.

Python version compatibility:

  • datetime.UTC: Added in Python 3.11 (PEP 615)
  • datetime.timezone.utc: Available since Python 3.2
Related Files

File with issue:

  • src/fastmcp/server/tasks/notifications.py:25 - Import statement
  • src/fastmcp/server/tasks/notifications.py:66 - Usage of UTC

Files using the correct pattern (examples):

  • src/fastmcp/server/middleware/middleware.py:6 - from datetime import datetime, timezone
  • src/fastmcp/server/tasks/requests.py:11 - from datetime import datetime, timedelta, timezone
  • src/fastmcp/server/tasks/handlers.py:10 - from datetime import datetime, timezone
  • src/fastmcp/server/tasks/subscriptions.py:12 - from datetime import datetime, timezone

All use timezone.utc instead of UTC for Python 3.10 compatibility.


Updated analysis based on workflow run 21672391648 - 2026-02-04 13:00 UTC

@gfortaine gfortaine force-pushed the fix/statusMessage-not-forwarded branch from 10acb17 to 936fab8 Compare February 4, 2026 13:59
@gfortaine
Copy link
Copy Markdown
Contributor Author

hey team 👋

follow-up to #2905 — this adds distributed notification support so elicitation actually works for background tasks running on remote workers.

ran a deep review loop and caught a few things worth mentioning:

fixes included in this pr:

  • set_current() instead of increment() for progress — atomic, no race condition if the same tool reports from multiple contexts
  • fail-fast on push_notification failure — no 1-hour hang waiting for a response that'll never come
  • cleanup registration via exit_stack — stop_subscriber actually gets called now
  • get_task_context().session_id as single source of truth — removes closure capture mismatch risk

tests: 280 passed, 3 skipped. rebased on main.

happy to walk through any of the implementation details if that would help. is there anything specific you'd like me to adjust before this gets merged?

cc @jlowin @chrisguidry

@gfortaine gfortaine force-pushed the fix/statusMessage-not-forwarded branch from 936fab8 to 5f70960 Compare February 4, 2026 19:34
@marvin-context-protocol
Copy link
Copy Markdown
Contributor

Test Failure Analysis

Summary: All Python 3.10 tests fail with ImportError: cannot import name 'UTC' from 'datetime' in the new notifications.py file.

Root Cause: The PR adds src/fastmcp/server/tasks/notifications.py:25 with:

from datetime import UTC, datetime

However, datetime.UTC was only introduced in Python 3.11. FastMCP supports Python ≥3.10 (see pyproject.toml), so this import fails on Python 3.10.x.

Suggested Solution: Replace datetime.UTC with the Python 3.10-compatible alternative:

# In src/fastmcp/server/tasks/notifications.py line 25
from datetime import datetime, timezone

# Then use timezone.utc instead of UTC throughout the file
# Example: datetime.now(timezone.utc) instead of datetime.now(UTC)
Detailed Analysis

All failing jobs show the same import chain:

tests/conftest.py:15 → fastmcp/__init__.py:15 → fastmcp/server/__init__.py:1 
→ fastmcp/server/context.py:28 → fastmcp/resources/__init__.py:1 
→ fastmcp/resources/function_resource.py:17 → fastmcp/resources/resource.py:32 
→ fastmcp/server/tasks/__init__.py:14 
→ fastmcp/server/tasks/notifications.py:25
ImportError: cannot import name 'UTC' from 'datetime'

The import happens at module load time, so all tests fail immediately before any test code runs.

Failed jobs:

  • Integration tests (Python 3.10 on ubuntu-latest)
  • Tests with lowest-direct dependencies (Python 3.10)
  • Tests: Python 3.10 on ubuntu-latest
  • Tests: Python 3.10 on windows-latest

Only Python 3.13 tests passed (where datetime.UTC is available).

Related Files
  • src/fastmcp/server/tasks/notifications.py: New file added by this PR, contains the incompatible import at line 25
  • pyproject.toml: Specifies requires-python = ">=3.10" (line 28)

@chrisguidry chrisguidry self-assigned this Feb 5, 2026
@chrisguidry
Copy link
Copy Markdown
Collaborator

chrisguidry commented Feb 5, 2026

This is looking really compelling so far, @gfortaine, thank you!

However, similar to the last PR, I'm having trouble with the style of the tests here. I think these heavily mocked tests are going to end up being difficult to maintain over time, because they are mocking many internal implementation details (including of redis itself). Take a look at the surrounding tests task-related tests and you'll see a very stark difference in style.

Can we rework these tests to be simpler by just setting up example MCP servers with task support that make elicitations and then assert that the client sees those elicitations? We should be able to write tests that just "do the thing" because we have redis available (via the memory:// broker) while the tests are running.

In our /python-tests skill in the repo, we've got some good guidance here: https://github.com/jlowin/fastmcp/blob/main/.claude/skills/python-tests/SKILL.md#dont-mock-what-you-own

gfortaine added a commit to gfortaine/fastmcp that referenced this pull request Feb 7, 2026
Address @chrisguidry's review feedback on PR PrefectHQ#2906:

- Rewrite test_context_background_task.py (1303→345 lines): replace heavily
  mocked tests with Client(mcp) integration tests using memory:// Docket
  backend. Zero mocks for Redis, sessions, or Docket internals.

- Rewrite test_notifications.py (626→104 lines): replace mock-heavy unit
  tests with 2 E2E integration tests that exercise the full notification
  pipeline through real Docket.

- Fix Python 3.10 CI failure: replace datetime.UTC (3.11+) with
  datetime.timezone.utc in notifications.py.

- Fix report_progress bug: ExecutionProgress has no set_current() method;
  use increment() with delta tracking instead.

Test coverage: 18 tests, all passing in 2.4s.
@marvin-context-protocol
Copy link
Copy Markdown
Contributor

marvin-context-protocol Bot commented Feb 7, 2026

Test Failure Analysis

Summary: Static type checking failed due to incorrect argument types for Redis blocking operations (blpop, brpop, lpush).

Root Cause: The Redis blpop and brpop methods expect a list of keys as the first argument, but the code passes a single string key. This is a type signature issue in the new elicitation and notification code.

Suggested Solution: Wrap all Redis blocking operation keys in a list:

  1. src/fastmcp/server/tasks/elicitation.py:175-178

    result = await redis.blpop(
        [docket.key(response_key)],  # Wrap in list
        timeout=max_wait_seconds,
    )
  2. src/fastmcp/server/tasks/notifications.py:107-109

    result = await redis.brpop(
        [queue_key],  # Wrap in list
        timeout=SUBSCRIBER_TIMEOUT_SECONDS
    )

These changes will fix all the type errors because blpop/brpop are designed to accept multiple keys (for fairness when waiting on multiple queues), even though this use case only needs one.

Detailed Analysis

Type Errors Found

The type checker (ty) reports 10 errors:

  1. Line 175: await redis.blpop(...) - not awaitable due to union type ambiguity
  2. Line 176: First argument to blpop expects list[Unknown], found Unknown | str
  3. Line 264: await redis.lpush(...) - not awaitable due to union type ambiguity
  4. Line 70 (notifications.py): await redis.lpush(...) - not awaitable
  5. Line 107 (notifications.py): await redis.brpop(...) - not awaitable
  6. Line 108 (notifications.py): First argument to brpop expects list[Unknown], found str

All stem from the same root cause: the Redis client library type stubs indicate these methods accept keys: List (plural), not a single string.

Why This Matters

From the Redis documentation:

  • BLPOP key [key ...] timeout - can monitor multiple keys
  • BRPOP key [key ...] timeout - can monitor multiple keys

The Python redis library reflects this by requiring a list, even for single-key operations.

Related Files
  • src/fastmcp/server/tasks/elicitation.py:175-178 - blpop call in elicit_for_task
  • src/fastmcp/server/tasks/elicitation.py:264-267 - lpush call in handle_task_input
  • src/fastmcp/server/tasks/notifications.py:70 - lpush call in push_notification
  • src/fastmcp/server/tasks/notifications.py:107-109 - brpop call in subscriber loop

Updated: 2026-02-07 (latest static analysis failure)

@marvin-context-protocol
Copy link
Copy Markdown
Contributor

marvin-context-protocol Bot commented Feb 7, 2026

Test Failure Analysis

Summary: Static type checking failed with 11 type errors in Redis blocking operations (blpop, brpop, lpush) and test argument types.

Root Cause:

  1. Redis blpop/brpop methods expect a list of keys, but the code passes single strings
  2. Redis lpush has union return types (Awaitable[int] | int) causing await ambiguity in type checker
  3. Test passes potentially None values where str is required

Suggested Solution:

Fix 1: Wrap Redis keys in lists

# src/fastmcp/server/tasks/elicitation.py:175-178
result = await redis.blpop(
    [docket.key(response_key)],  # Wrap in list
    timeout=max_wait_seconds,
)

# src/fastmcp/server/tasks/notifications.py:107-109
result = await redis.brpop(
    [queue_key],  # Wrap in list
    timeout=SUBSCRIBER_TIMEOUT_SECONDS
)

Fix 2: Add type ignores for lpush union types

# src/fastmcp/server/tasks/elicitation.py:264
await redis.lpush(  # type: ignore[misc]
    docket.key(response_key),
    json.dumps(response),
)

# src/fastmcp/server/tasks/notifications.py:70
await redis.lpush(key, message)  # type: ignore[misc]

# src/fastmcp/server/tasks/notifications.py:132
await redis.lpush(queue_key, json.dumps(message))  # type: ignore[misc]

Fix 3: Handle None values in test

# tests/server/tasks/test_notifications.py:53-59
success = await handle_task_input(
    task_id=captured["task_id"] or "",  # Provide fallback
    session_id=captured["session_id"] or "",  # Provide fallback
    action="accept",
    content={"value": "hello"},
    fastmcp=mcp,
)
Detailed Analysis

Type Errors from Workflow Run 21789768033

The static analysis (prek with ty check) found 11 diagnostics:

1. Invalid await on blpop (elicitation.py:175)

  • Error: Unknown | Awaitable[list[Unknown]] | list[Unknown] is not awaitable
  • Cause: Type checker can't determine return type when argument doesn't match signature

2-3. Invalid argument types for blpop (elicitation.py:176)

  • Error: Expected list[Unknown], found Unknown | str
  • Cause: blpop() signature requires keys: List, not a single string
  • From redis-py: def blpop(self, keys: List, timeout: Optional[Number] = 0)

4. Invalid await on lpush (elicitation.py:264)

  • Error: Unknown | Awaitable[int] | int is not awaitable
  • Cause: Union type from redis-py stubs (supports both sync/async clients)

5. Invalid await on lpush (notifications.py:70)

  • Error: Awaitable[int] | int is not awaitable
  • Same union type issue

6-8. Invalid await/argument for brpop (notifications.py:107-108)

  • Error: Expected list[Unknown], found str
  • Same issue as blpop - requires list of keys

9. Invalid await on lpush (notifications.py:132)

  • Same union type issue

10-11. Test argument type errors (test_notifications.py:54-55)

  • Error: Expected str, found None
  • Cause: captured dict is typed as dict[str, str | None] with initial None values
  • The test assumes these will be set, but type checker sees potential None

Why Redis Methods Require Lists

Both BLPOP and BRPOP are designed to monitor multiple keys simultaneously and return the first one that has data. From Redis docs:

  • BLPOP key [key ...] timeout
  • BRPOP key [key ...] timeout

Even for single-key use cases, the Python client requires a list to match the command signature.

Why lpush Has Union Types

The redis-py library supports both synchronous and async clients with a shared type hierarchy. This creates union return types like Awaitable[int] | int. The code correctly uses await, but the type checker can't statically determine which branch of the union applies.

Related Files

Files with type errors:

  • src/fastmcp/server/tasks/elicitation.py:176, 264 - blpop and lpush calls
  • src/fastmcp/server/tasks/notifications.py:70, 107-108, 132 - lpush and brpop calls
  • tests/server/tasks/test_notifications.py:54-55 - test with potential None values

Reference:


Updated: 2026-02-08 01:00 UTC (Workflow run 21789768033)

Add Redis-backed notification queue (LPUSH/BRPOP) enabling the MCP server
to notify clients about background task events like elicitation requests.

- notifications.py: subscriber management with weakref tracking, retry
  logic, TTL expiration, and graceful shutdown
- __init__.py: export ensure_subscriber_running, push_notification,
  stop_subscriber
- context.py: report_progress uses delta tracking via increment() instead
  of set_current() (which doesn't exist), stores progress in Redis for
  background tasks
- elicitation.py: replace polling with BLPOP for efficient blocking wait,
  fail-fast on notification push failure, use get_task_context() for
  authoritative session_id
- handlers.py: subscriber cleanup on session disconnect via
  _exit_stack.push_async_callback()
Replace 1300+ lines of mock-heavy unit tests with 391 lines of integration
tests using real Client(mcp) connections and memory:// Docket backend.

- test_context_background_task.py: 17 tests covering report_progress delta
  tracking, elicitation flow, edge cases, and fail-fast on push failure
- test_notifications.py: 2 E2E tests for notification queue lifecycle
@gfortaine gfortaine force-pushed the fix/statusMessage-not-forwarded branch from b2ca066 to 1f84e50 Compare February 8, 2026 00:59
@gfortaine
Copy link
Copy Markdown
Contributor Author

@chrisguidry totally fair — mocking redis internals creates maintenance debt and the contrast with the surrounding task tests was stark. rewrote everything.

  • test_context_background_task.py: 896 → 394 lines. Client(mcp) + memory://, zero mocks. tests just spin up a server, call tools with task=True, elicit, assert.
  • test_notifications.py: added NotificationCaptureHandler that verifies the client sees notifications/tasks/status with input_required metadata — exactly the end-to-end assertion you asked for.
  • only surgical mock: TestElicitFailFast patches push_notification to test fail-fast. everything else is real.

also switched to TaskStatusNotification.model_validate() for typed notification path and fixed the ty static analysis issues.

rebased on v3.0.0b2 (4 commits). how does this land for you?

@gfortaine gfortaine changed the title fix: Context.report_progress stores progress in Redis for background tasks feat: distributed notification queue + BLPOP elicitation for background tasks Feb 8, 2026
Copy link
Copy Markdown
Collaborator

@chrisguidry chrisguidry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gfortaine It's a thing of beauty, thank you! Really appreciate the readability of the test now and that you fixed up the previous round of them too. Also great catch finding the SDK's correct notification types. When I originally pulled this all together, the SDK was in flux and only partially implemented too. I think things have definitely firmed up there since the original implementation.


1. Set task status to "input_required" via Redis
2. Send notifications/tasks/updated with elicitation metadata
2. Send notifications/tasks/status with elicitation metadata
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! 😅

gfortaine added a commit to gfortaine/fastmcp that referenced this pull request Feb 9, 2026
…fectHQ#3121)

Merges two orthogonal PRs into a single coherent changeset:

From PrefectHQ#2906 - Distributed notification queue & BLPOP elicitation:
- Add notifications.py: LPUSH/BRPOP notification queue for background tasks
- Replace polling-based elicitation with single BLPOP call
- Fail-fast on notification push failure (return cancel immediately)
- Add Docket ExecutionProgress API for report_progress() in background tasks
- Wire notification subscriber lifecycle on session exit stack
- Use typed TaskStatusNotification.model_validate() in handlers

From PrefectHQ#3121 - Access token snapshot & lifespan_context fallback:
- Snapshot AccessToken to Redis alongside task metadata
- Add _task_access_token ContextVar restored on Docket worker entry
- Extend get_access_token() with expiry-aware fallback chain
- Add lifespan_context property fallback to server._lifespan_result

Reviewer fixes applied to PrefectHQ#3121:
- Replace silent except with logger.warning()
- Use _current_docket before DocketDependency.docket.get() fallback
- Hoist datetime import to module level
- Type _access_token_cv_token as Token[...] | None instead of Any

Tests: 27 new/consolidated tests (de-mocked), 264 total tasks/ tests pass.
gfortaine added a commit to gfortaine/fastmcp that referenced this pull request Feb 10, 2026
…fectHQ#3121)

Merges two orthogonal PRs into a single coherent changeset:

From PrefectHQ#2906 - Distributed notification queue & BLPOP elicitation:
- Add notifications.py: LPUSH/BRPOP notification queue for background tasks
- Replace polling-based elicitation with single BLPOP call
- Fail-fast on notification push failure (return cancel immediately)
- Add Docket ExecutionProgress API for report_progress() in background tasks
- Wire notification subscriber lifecycle on session exit stack
- Use typed TaskStatusNotification.model_validate() in handlers

From PrefectHQ#3121 - Access token snapshot & lifespan_context fallback:
- Snapshot AccessToken to Redis alongside task metadata
- Add _task_access_token ContextVar restored on Docket worker entry
- Extend get_access_token() with expiry-aware fallback chain
- Add lifespan_context property fallback to server._lifespan_result

Reviewer fixes applied to PrefectHQ#3121:
- Replace silent except with logger.warning()
- Use _current_docket before DocketDependency.docket.get() fallback
- Hoist datetime import to module level
- Type _access_token_cv_token as Token[...] | None instead of Any

Tests: 27 new/consolidated tests (de-mocked), 264 total tasks/ tests pass.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working. Reports of errors, unexpected behavior, or broken functionality. server Related to FastMCP server implementation or server-side functionality.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Task polling statusMessage not forwarded to MCP client

2 participants