fix get_access_token and lifespan_context returning None in background tasks - issue (#3095)#3121
Conversation
WalkthroughAdds background-task access token propagation and a lifespan-context fallback for non-request tasks. On task submission, if an AccessToken exists it is serialized to Redis under a task-specific key with TTL. Background-task startup restores that token into a private ContextVar (with expiry validation) so get_access_token() can return the restored token in task contexts. lifespan_context now returns a stored lifespan result when no request context exists. ContextVar restoration handles are tracked and reset on context exit. Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/fastmcp/server/dependencies.py (1)
508-521: Expiration check looks correct but import could be hoisted.The
from datetime import datetime, timezoneimport inside the function body on line 517 is executed on every call toget_access_token()that reaches this branch. While Python caches module imports, moving it to the top-level would be cleaner and marginally faster for hot paths.Also, verify that
expires_atis always anint(epoch seconds). If it could be afloatfrom the JWT, theint()cast ondatetime.now(...)is fine, but the<comparison betweenintandintis safe only if both sides agree on the unit.src/fastmcp/server/tasks/handlers.py (1)
106-113: Token write failure will abort task submission — consider whether this is intentional.The
access_tokenRedis write (lines 110-113) is inside the sameasync with docket.redis()block as the mandatory metadata writes. Ifmodel_dump_json()raises (unlikely but possible with custom serializers), the entiresubmit_to_docketcall fails. Since the access token is supplementary (tasks can run without it), you may want to guard this write separately so that serialization issues don't prevent task submission.♻️ Optional: isolate token write from mandatory metadata
async with docket.redis() as redis: await redis.set(task_meta_key, task_key, ex=ttl_seconds) await redis.set(created_at_key, created_at.isoformat(), ex=ttl_seconds) await redis.set(poll_interval_key, str(poll_interval_ms), ex=ttl_seconds) - if access_token is not None: - await redis.set( - access_token_key, access_token.model_dump_json(), ex=ttl_seconds - ) + if access_token is not None: + try: + await redis.set( + access_token_key, access_token.model_dump_json(), ex=ttl_seconds + ) + except Exception: + import logging + logging.getLogger(__name__).warning( + "Failed to snapshot access token for task %s", server_task_id, + exc_info=True, + )
| async def _restore_task_access_token(self, session_id: str, task_id: str) -> None: | ||
| """Restore the access token snapshot from Redis into a ContextVar. | ||
|
|
||
| Called when creating a task-aware Context in a Docket worker. The token | ||
| was stored at submit_to_docket() time. If the token has expired, it is | ||
| not restored (get_access_token() will return None). | ||
| """ | ||
| docket = _current_docket.get() | ||
| if docket is None: | ||
| return | ||
|
|
||
| token_key = docket.key(f"fastmcp:task:{session_id}:{task_id}:access_token") | ||
| try: | ||
| async with docket.redis() as redis: | ||
| token_data = await redis.get(token_key) | ||
| if token_data is not None: | ||
| restored = AccessToken.model_validate_json(token_data) | ||
| self._access_token_cv_token = _task_access_token.set(restored) | ||
| except Exception: | ||
| # Don't let token restoration failures break task execution | ||
| pass |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Silent except Exception: pass swallows all errors — log a warning at minimum.
Static analysis flags this (S110, BLE001), and the coding guidelines state: "Never use bare except - be specific with exception types." While except Exception isn't technically bare, it catches far too broadly (e.g., AccessToken.model_validate_json raising ValidationError, Redis connectivity issues, key encoding problems). Silently discarding these makes debugging token-restoration failures very difficult.
At minimum, log a warning. Ideally, narrow the exception types.
As per coding guidelines: "Never use bare except - be specific with exception types."
🛡️ Proposed fix
+ import logging
+
+ _logger = logging.getLogger(__name__)
+
try:
async with docket.redis() as redis:
token_data = await redis.get(token_key)
if token_data is not None:
restored = AccessToken.model_validate_json(token_data)
self._access_token_cv_token = _task_access_token.set(restored)
- except Exception:
- # Don't let token restoration failures break task execution
- pass
+ except Exception:
+ # Don't let token restoration failures break task execution
+ _logger.warning(
+ "Failed to restore access token for task %s:%s",
+ session_id,
+ task_id,
+ exc_info=True,
+ )🧰 Tools
🪛 Ruff (0.14.14)
[error] 808-810: try-except-pass detected, consider logging the exception
(S110)
[warning] 808-808: Do not catch blind exception: Exception
(BLE001)
c4855fb to
ae40d0f
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/fastmcp/server/dependencies.py (2)
508-521: Expiration check and fallback look correct, but consider hoisting the import.The fallback logic is sound: check
_task_access_tokenContextVar, validate expiry, and return. The<comparison correctly treats a token whoseexpires_atequalsnowas still valid.However,
from datetime import datetime, timezoneis a deferred import inside what can be a frequently called function. Since this module already imports fromcontextvars,contextlib, etc. at the top level, consider addingdatetimeto the top-level imports instead.♻️ Hoist import to module level
from collections.abc import AsyncGenerator, Callable from contextlib import AsyncExitStack, asynccontextmanager from contextvars import ContextVar, Token from dataclasses import dataclass +from datetime import datetime, timezone from functools import lru_cacheThen in
get_access_token():if task_token.expires_at is not None: - from datetime import datetime, timezone - if task_token.expires_at < int(datetime.now(timezone.utc).timestamp()): return None
787-787: Consider usingToken[AccessToken | None] | Noneinstead ofAny.Both
_CurrentContext._access_token_cv_tokenand_CurrentAccessToken._access_token_cv_tokenare typed asAny. Since you already importTokenfromcontextvars(line 15), you could use the precise type for better static analysis support.♻️ Suggested type annotation
class _CurrentContext(Dependency): # type: ignore[misc] _context: Context | None = None - _access_token_cv_token: Any = None + _access_token_cv_token: Token[AccessToken | None] | None = Noneclass _CurrentAccessToken(Dependency): # type: ignore[misc] - _access_token_cv_token: Any = None + _access_token_cv_token: Token[AccessToken | None] | None = NoneAlso applies to: 1066-1067
| docket = _current_docket.get() | ||
| # Fall back to docket's own worker ContextVar, which is set by the worker | ||
| # even when _CurrentContext hasn't run (no ctx: Context in signature) | ||
| if docket is None: | ||
| try: | ||
| from docket.dependencies import Dependency as DocketDependency | ||
|
|
||
| docket = DocketDependency.docket.get() | ||
| except (ImportError, LookupError): | ||
| pass | ||
| if docket is None: | ||
| return None |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n "DocketDependency\.docket" --type=py -C2Repository: jlowin/fastmcp
Length of output: 439
🏁 Script executed:
# Check if there are other imports or uses of docket library
rg -n "from docket" --type=py -C1Repository: jlowin/fastmcp
Length of output: 6373
🏁 Script executed:
# Look at the full context of the file to understand the pattern
head -n 20 src/fastmcp/server/dependencies.pyRepository: jlowin/fastmcp
Length of output: 768
🌐 Web query:
docket library Python Dependency ContextVar docket attribute documentation
💡 Result:
In Docket (PyPI package pydocket), “dependency injection” is done by declaring default parameter values like CurrentDocket(), CurrentWorker(), CurrentExecution(), TaskKey(), TaskArgument(), or Depends(fn) in your task signature; Docket resolves those at runtime when executing the task. (chrisguidry.github.io)
Those “current-*” dependencies are contextual (they’re meant to give you “the current docket/worker/execution for this running task” without threading it through every call). In Python, the standard way libraries implement safe per-request/per-task ambient context across async boundaries is contextvars.ContextVar. ContextVar objects have a read-only .name attribute (so if a library creates something like ContextVar("docket"), its “docket attribute”/name is just "docket"). (docs.python.org)
If you’re coming from FastMCP: FastMCP re-exports these DI concepts (and notes its DI is “powered by Docket”; task-related dependencies like CurrentDocket/CurrentWorker require fastmcp[tasks]). (gofastmcp.com)
Citations:
- 1: https://chrisguidry.github.io/docket/api-reference/?utm_source=openai
- 2: https://docs.python.org/3/library/contextvars.html?utm_source=openai
- 3: https://gofastmcp.com/python-sdk/fastmcp-dependencies?utm_source=openai
🌐 Web query:
pydocket Dependency.docket ContextVar stable API
💡 Result:
Dependency.docket (a ContextVar hanging off the internal Dependency base class) should be treated as internal / not a stable public API. Docket is still marked “Beta” on PyPI, so internals are especially likely to change. [1]
If you need “current docket/worker/execution” context in a supported way, use the documented built-in dependencies instead:
CurrentDocket()for the currentDocket[2]CurrentWorker()/CurrentExecution()for worker/execution context [2]TaskKey()/TaskArgument()for task identity/arguments (often used inside your own dependency functions) [2]
And for custom DI, use the public helpers Depends(...) (per-task) and Shared(...) (worker-scoped), rather than reaching into Dependency internals. [3]
Bottom line: don’t rely on Dependency.docket / ContextVar attributes directly; prefer CurrentDocket() (and friends) or Depends(...). [2][3]
References:
[1] https://pypi.org/project/pydocket/
[2] https://chrisguidry.github.io/docket/dependencies/
[3] https://chrisguidry.github.io/docket/api-reference/
Replace DocketDependency.docket with the documented public API.
The fallback to DocketDependency.docket.get() relies on an internal, unstable API that is not part of Docket's public contract. According to Docket's documentation, Dependency.docket is an internal implementation detail and should not be used directly. Additionally, Docket is currently in Beta status on PyPI, making internal APIs especially prone to breaking changes.
Use the documented public dependency CurrentDocket() instead, which is the official way to access the current docket context in Docket-powered applications. If the fallback is needed to handle cases where _CurrentContext hasn't propagated, consider whether CurrentDocket() would better address the underlying issue, or add a comment explaining why this fallback is necessary.
| assert result.data == "Hello, Bob!" | ||
|
|
||
|
|
||
| class TestAccessTokenInBackgroundTasks: |
There was a problem hiding this comment.
@cristiangreco94 I've got similar feedback about this test suite as I did with @gfortaine's last PR. He did a really phenomenal job of de-mocking it, can you give this test suite a pass too?
The general idea here is that we have a fully functional in-memory Docket and redis (via Docket's memory broker implementation using fakeredis) so you shouldn't really need to mock anything here. Just write the test using the FastMCP client and example MCP servers and tools/prompts/resources to demonstrate the end-to-end flow. You can absolutely use mocks when you want to emulate failure conditions, etc, but these tests as they stand right now create such a closed mock world that they could easily drift from reality without failing.
| try: | ||
| task_info = TaskContextInfo( |
There was a problem hiding this comment.
Having a try in a unit test is kind of a code smell for me. If we need to ensure some clean up across tests, let's have a pytest fixture that handles that instead so that the test can just be "flat" and more obvious.
…fectHQ#3121) Merges two orthogonal PRs into a single coherent changeset: From PrefectHQ#2906 - Distributed notification queue & BLPOP elicitation: - Add notifications.py: LPUSH/BRPOP notification queue for background tasks - Replace polling-based elicitation with single BLPOP call - Fail-fast on notification push failure (return cancel immediately) - Add Docket ExecutionProgress API for report_progress() in background tasks - Wire notification subscriber lifecycle on session exit stack - Use typed TaskStatusNotification.model_validate() in handlers From PrefectHQ#3121 - Access token snapshot & lifespan_context fallback: - Snapshot AccessToken to Redis alongside task metadata - Add _task_access_token ContextVar restored on Docket worker entry - Extend get_access_token() with expiry-aware fallback chain - Add lifespan_context property fallback to server._lifespan_result Reviewer fixes applied to PrefectHQ#3121: - Replace silent except with logger.warning() - Use _current_docket before DocketDependency.docket.get() fallback - Hoist datetime import to module level - Type _access_token_cv_token as Token[...] | None instead of Any Tests: 27 new/consolidated tests (de-mocked), 264 total tasks/ tests pass.
…fectHQ#3121) Merges two orthogonal PRs into a single coherent changeset: From PrefectHQ#2906 - Distributed notification queue & BLPOP elicitation: - Add notifications.py: LPUSH/BRPOP notification queue for background tasks - Replace polling-based elicitation with single BLPOP call - Fail-fast on notification push failure (return cancel immediately) - Add Docket ExecutionProgress API for report_progress() in background tasks - Wire notification subscriber lifecycle on session exit stack - Use typed TaskStatusNotification.model_validate() in handlers From PrefectHQ#3121 - Access token snapshot & lifespan_context fallback: - Snapshot AccessToken to Redis alongside task metadata - Add _task_access_token ContextVar restored on Docket worker entry - Extend get_access_token() with expiry-aware fallback chain - Add lifespan_context property fallback to server._lifespan_result Reviewer fixes applied to PrefectHQ#3121: - Replace silent except with logger.warning() - Use _current_docket before DocketDependency.docket.get() fallback - Hoist datetime import to module level - Type _access_token_cv_token as Token[...] | None instead of Any Tests: 27 new/consolidated tests (de-mocked), 264 total tasks/ tests pass.
|
@cristiangreco94 @chrisguidry — i have a consolidated branch ( the test suite is fully end-to-end: in-memory docket + fakeredis, no mock world, no try blocks in tests — addresses the review feedback here. also picked up a non-monotonic progress delta bug along the way. 4 commits, rebased on upstream/main. PR is up at #2906. happy to adjust if @chrisguidry or @jlowin prefer a different structure. |
|
@gfortaine Thanks but I actually just merged #2906 :D @cristiangreco94 if you can rebase this one on main, I can review it soon after |
|
I think this PR from @gfortaine supersedes mine, so i will proceed and close it. if not, let me know and I will reopen it and work on it. |
Description
When a tool is marked with task=True, get_access_token() always returns None and lifespan_context is always empty — even when authentication is properly configured and a lifespan is set. This happens because neither the HTTP request scope nor the SDK's auth context var survive the Docket worker boundary, and request_ctx isn't set in workers.
The fix snapshots the AccessToken into Redis at submit_to_docket() time (alongside existing task metadata), restores it into a ContextVar when the background task Context is created in the Docket worker, and adds it as a fallback in get_access_token(). Expired tokens (past expires_at) return None rather than raising. For lifespan_context, the fix falls back to reading from the FastMCP server instance directly when request_context is unavailable — no Redis needed.
Contributors Checklist
Review Checklist