Conversation
Wraps a user-provided event stream handler as a capability, allowing it to participate in the capability middleware chain instead of requiring the dedicated `event_stream_handler` argument on `Agent`/`Agent.run`. Ordered 'outermost' so the handler sees events after all other `wrap_run_event_stream` wrappers have transformed the stream. Both approaches coexist: the existing `event_stream_handler` params remain unchanged.
| """A capability that consumes the agent's event stream via a user-provided async handler. | ||
|
|
||
| The handler receives the stream of [`AgentStreamEvent`][pydantic_ai.messages.AgentStreamEvent]s | ||
| emitted during model streaming and tool execution for each | ||
| [`ModelRequestNode`][pydantic_ai._agent_graph.ModelRequestNode] and | ||
| [`CallToolsNode`][pydantic_ai._agent_graph.CallToolsNode]. | ||
|
|
||
| When this capability is registered, [`agent.run()`][pydantic_ai.Agent.run] automatically | ||
| enables streaming so the handler fires without requiring an explicit `event_stream_handler` | ||
| argument. The capability is ordered `'outermost'` so the handler sees events after all | ||
| other [`wrap_run_event_stream`][pydantic_ai.capabilities.AbstractCapability.wrap_run_event_stream] | ||
| wrappers have transformed the stream. |
There was a problem hiding this comment.
The docstring mentions that the capability works "without requiring an explicit event_stream_handler argument," but it doesn't warn about what happens if both are set. Looking at abstract.py:309-326, when both are present, HandleEventStream.wrap_run_event_stream consumes the stream as the outermost wrapper, and the event_stream_handler param handler receives an empty iterable.
Worth adding a note that this capability is mutually exclusive with the event_stream_handler parameter, or alternatively adding a runtime guard (e.g. in for_run) that raises when both are configured.
| [`ModelRequestNode`][pydantic_ai._agent_graph.ModelRequestNode] and | ||
| [`CallToolsNode`][pydantic_ai._agent_graph.CallToolsNode]. |
There was a problem hiding this comment.
The mkdocstrings cross-references here point to private module pydantic_ai._agent_graph — these won't render as working links in the docs and expose internal architecture. Consider either removing them, or referencing the public-facing concepts (e.g. "model request and tool execution nodes") without linking to private classes.
Docs Preview
|
- Switch HandleEventStream to a tee: forward events downstream while also feeding the handler via an internal memory object stream. Multiple capabilities (and the explicit `event_stream_handler` param) can now observe the same stream without shadowing each other. - Drop `outermost` ordering — no longer needed with the tee design. - Drop the private `_agent_graph` module links from the docstring. - Add HandleEventStream row to the capabilities.md table.
The `EventStreamHandler` type alias now accepts either an observer (async def returning None) or a transformer (async generator yielding AgentStreamEvents). Both forms are supported by `HandleEventStream` and the top-level `event_stream_handler` parameter: - Observer: the capability tees events to the handler while passing them through unchanged to the rest of the chain. - Transformer: the capability iterates the handler's output and forwards those events to the chain, letting the handler modify, drop, or add events. At the `event_stream_handler` parameter level the handler is the terminal consumer, so observer and transformer are equivalent (both iterated to completion via a shared `_consume_event_stream_handler` helper).
All three durable-exec backends (prefect, temporal, dbos) call the user's event stream handler directly in their wrapper models/agents. Switch those sites to the shared `consume_event_stream_handler` helper in `agent/abstract.py` so both observer and transformer handler forms work.
…els to capability chain - Revert `EventStreamHandler` to its original observer-only signature (`Awaitable[None]`). The param-level handler is a terminal consumer where a transformer return has nowhere to go. - Introduce `EventStreamProcessor` (async generator yielding events) as a sibling type used only where transformation is meaningful. - `HandleEventStream.handler` accepts either; it dispatches via `inspect.isasyncgenfunction` — tee for observers, iterate-and-yield for processors. Name follows the `HistoryProcessor` convention already used for the history-processing capability. - Remove the cross-module `consume_event_stream_handler` helper; call sites just `await handler(...)` again. Also thread the owning agent through each durable-exec model wrapper (`TemporalModel` already had it; `DBOSModel` and `PrefectModel` now do), and route the streamed response through `agent.root_capability.wrap_run_event_stream` inside the activity/step before passing it to the handler. With no capability overriding the hook this is a transparent pass-through, so existing event stream behavior and span output are unchanged; `HandleEventStream` (and any other capability overriding `wrap_run_event_stream`) now fires inside the durable activity rather than being silently ignored. Expose `root_capability` on `AbstractAgent` / `WrapperAgent` so wrapper models can reach it polymorphically.
- Extend test_wrapper_agent to check root_capability delegates. - Add HandleEventStream with a no-op observer handler to the shared complex_agent fixtures in test_dbos.py and test_temporal.py. Since the capability just tees events, the param-level event_stream_handler continues to see the same event sequence (snapshots unchanged), and the activity-level wrap_run_event_stream routing now gets exercised realistically.
…le-invoke - Extract a `run_event_stream_through_capabilities` helper in `agent/abstract.py` and call it from all three durable-exec model wrappers (DBOS/Prefect/Temporal). Replaces the gated inline code that was skipping capability wrapping whenever `event_stream_handler` was unset, so a user who registers only `HandleEventStream` on the agent now sees real model events inside the activity/step. - `HandleEventStream.wrap_run_event_stream` now dispatches by inspecting the handler's return value (`isinstance(probe, AsyncIterator)`) rather than `inspect.isasyncgenfunction`, so callable-class processors whose `__call__` is an async generator work correctly too. - Tee loop catches `BrokenResourceError`/`ClosedResourceError` so a handler that iterates once and returns no longer crashes the wrap — downstream consumers keep receiving events. - In `Agent.run()`'s streaming loop, skip the outer `wrap_run_event_stream` pass when the stream is a `CompletedStreamedResponse` (the marker durable wrappers return after consuming events inside an activity/step). This avoids double-invoking the capability chain — once with real events inside the activity, again on an empty stream in the workflow loop. - Tests: direct unit tests for the helper cover all branches (agent/no-agent × handler/no-handler); `HandleEventStream` tests cover callable-class processors and observer early-bailout.
- Revert the `isinstance(stream, CompletedStreamedResponse)` guard in `Agent.run()`'s streaming loop: `n.stream()` yields an `AgentStream` wrapper, not the underlying `StreamedResponse`, so the guard was a no-op that never triggered. The double-invocation behavior around durable-exec's `CompletedStreamedResponse` is real but needs a different fix; leaving it for a followup. - Simplify `run_event_stream_through_capabilities`: always iterate the (possibly capability-wrapped) stream when no handler is provided, since the helper is only invoked when at least one of them needs to see events. Drops the stale `elif wrapped is not stream` branch. - Tighten the direct unit tests to match the simpler helper contract.
When a user registers `HandleEventStream` on a `TemporalAgent` without an explicit `event_stream_handler`, `AbstractAgent.run()` still calls `TemporalModel.request_stream` because a capability overrides `wrap_run_event_stream`. The previous assertion rejected that scenario; allow it through.
HandleEventStream capability
Back out the approach where durable-exec model wrappers reached into `self._agent.root_capability.wrap_run_event_stream(...)` to fire the capability chain inside the activity/step. Reasons: - Durable wrappers are documented as using only the public Agent API; reaching into `root_capability` crosses that line. - The outer `AbstractAgent.run()` loop still wrapped the `CompletedStreamedResponse` returned from the durable call, so the handler fired twice per model request. A marker to suppress the second wrap is possible but wants more thought once PR #4977 (which ports durable exec to capabilities) lands. The in-activity wrapping is removed from all three durable models and the `run_event_stream_through_capabilities` helper is deleted. `HandleEventStream`'s docstring now spells out the durable limitation and points at the capability-port work. `event_stream_handler=` on the agent continues to work inside durable runs exactly as before. (`AbstractAgent.root_capability` abstract property and `WrapperAgent`'s delegation stay — they're small and correct regardless of whether durable code currently reaches for them.)
| async with anyio.create_task_group() as tg: | ||
| tg.start_soon(run_handler) | ||
| async with send_stream: | ||
| handler_alive = True | ||
| async for event in stream: | ||
| if handler_alive: | ||
| try: | ||
| await send_stream.send(event) | ||
| except (anyio.BrokenResourceError, anyio.ClosedResourceError): | ||
| # Handler bailed early; keep forwarding events downstream. | ||
| handler_alive = False | ||
| yield event |
There was a problem hiding this comment.
🚩 Observer handler exceptions propagate through task group and crash the stream
If the observer handler raises an exception inside run_handler() (line 82-84), the anyio.create_task_group at line 86 will cancel the main loop and propagate the exception as an ExceptionGroup. This means a failing observer can crash the downstream stream — which somewhat contradicts the docstring's claim that observers pass events 'without interfering' (line 31-33). The test_observer_bailout_does_not_break_downstream test covers early-return but not exception-throwing. This is arguably the correct behavior (user errors should surface), but the docstring could be more explicit about error propagation semantics.
Was this helpful? React with 👍 or 👎 to provide feedback.
| EventStreamProcessor: TypeAlias = Callable[ | ||
| [RunContext[AgentDepsT], AsyncIterable[_messages.AgentStreamEvent]], | ||
| AsyncIterator[_messages.AgentStreamEvent], | ||
| ] | ||
| """An async generator that receives agent [`RunContext`][pydantic_ai.tools.RunContext] and an async iterable of events and yields a potentially modified stream. | ||
|
|
||
| Used with the [`HandleEventStream`][pydantic_ai.capabilities.HandleEventStream] capability to modify, drop, or add events visible to the rest of the capability chain.""" |
There was a problem hiding this comment.
🚩 EventStreamProcessor type alias not exported from pydantic_ai.agent
The new EventStreamProcessor type alias is defined at pydantic_ai_slim/pydantic_ai/agent/abstract.py:74 but is not added to pydantic_ai/agent/__init__.py's imports or __all__, unlike the existing EventStreamHandler which IS exported (line 89, 113 of agent/__init__.py). The HandleEventStream docstring references [pydantic_ai.agent.EventStreamProcessor] which would be a broken mkdocs link since the symbol isn't accessible at pydantic_ai.agent. Users wanting to annotate processor handlers would need to import from pydantic_ai.agent.abstract instead of pydantic_ai.agent.
Was this helpful? React with 👍 or 👎 to provide feedback.
|
|
||
| if not yielded: | ||
| raise exceptions.AgentRunError('Agent run finished without producing a final result') # pragma: no cover | ||
| raise exceptions.AgentRunError('Agent run finished without producing a final result') |
There was a problem hiding this comment.
🚩 Removal of pragma: no cover on unreachable line
The # pragma: no cover was removed from the raise AgentRunError('Agent run finished without producing a final result') at line 766 of abstract.py. Analysis of the run_stream control flow shows this line remains unreachable: the while not yielded loop can only exit via break (which always sets yielded = True first) or exception. The removal aligns with the coding guidelines (rule:97: 'Avoid # pragma: no cover'), but without a test covering this line, CI coverage checks may flag it. This is a coverage annotation concern rather than a correctness issue.
Was this helpful? React with 👍 or 👎 to provide feedback.
- Re-export `EventStreamProcessor` from `pydantic_ai.agent` alongside `EventStreamHandler`. Fixes the mkdocs cross-reference in the `HandleEventStream` docstring and lets users annotate processor handlers with the canonical import path. - Spell out in the docstring what happens when an observer handler returns early (stops receiving events, downstream unaffected) vs. raises (exception propagates to the rest of the run).
After merging main, line 814 is now flagged as missing from coverage. This branch can't be reached from normal test paths: the surrounding `while not yielded:` loop only exits when `yielded=True` or via an exception that propagates past the `if` entirely. The earlier strict-no-cover flag that got me to remove this pragma was a false positive; the merge seems to have resolved it.
|
|
||
|
|
||
| @dataclass | ||
| class HandleEventStream(AbstractCapability[AgentDepsT]): |
There was a problem hiding this comment.
What do you think about calling this ProcessEventStream instead, since "handle" underplays its potential side effects? And maybe capabilities.HistoryProcessor should become ProcessHistory in v2 as well.
There was a problem hiding this comment.
Renamed to ProcessEventStream in 869e257a. Filed a tracker plan for HistoryProcessor → ProcessHistory at ~/.claude/plans/history-processor-rename.md — will pick that up as a separate PR so this one stays focused.
| durable model consumes those inside the activity before returning. The in-flight | ||
| `event_stream_handler` parameter does still observe the live events, and a future | ||
| refactor threading the capability chain through the activity boundary is tracked | ||
| separately. |
|
|
||
| @property | ||
| @abstractmethod | ||
| def root_capability(self) -> CombinedCapability[AgentDepsT]: |
There was a problem hiding this comment.
Does this require all subclasses to implement this now, even existing ones out in the wild? That'd be a breaking change. I'd rather have it just raise in that case, not block the subclass creation.
There was a problem hiding this comment.
Good catch. Dropped @abstractmethod in 869e257a; the default raises NotImplementedError on access but no longer blocks subclass instantiation.
…ink PR #4977, drop abstractmethod on root_capability - Rename `HandleEventStream` to `ProcessEventStream` (and file `event_stream_handler.py` → `process_event_stream.py`). "Handle" underplayed the transformer form's side effects. - Link the durable-execution note to PR #4977 so readers can follow the in-flight work. - Drop `@abstractmethod` from `AbstractAgent.root_capability` — existing third-party subclasses shouldn't break; raising `NotImplementedError` on access is enough.
| send_stream, receive_stream = anyio.create_memory_object_stream[AgentStreamEvent]() | ||
|
|
||
| async def run_handler() -> None: | ||
| async with receive_stream: | ||
| await observer(ctx, receive_stream) | ||
|
|
||
| async with anyio.create_task_group() as tg: | ||
| tg.start_soon(run_handler) | ||
| async with send_stream: | ||
| handler_alive = True | ||
| async for event in stream: | ||
| if handler_alive: | ||
| try: | ||
| await send_stream.send(event) | ||
| except (anyio.BrokenResourceError, anyio.ClosedResourceError): | ||
| # Handler bailed early; keep forwarding events downstream. | ||
| handler_alive = False | ||
| yield event |
There was a problem hiding this comment.
📝 Info: Observer path uses zero-buffer memory stream creating synchronous backpressure
The observer path at pydantic_ai_slim/pydantic_ai/capabilities/process_event_stream.py:83-100 uses anyio.create_memory_object_stream[AgentStreamEvent]() with the default buffer size of 0 (rendezvous channel). This means send_stream.send(event) blocks until the observer calls receive(). Combined with the sequential yield event after the send, this creates a pipeline where each event is first delivered to the observer, then yielded downstream. A slow observer will back-pressure the entire event stream pipeline. This is documented in the docstring ('Events are delivered synchronously, so a slow handler back-pressures the rest of the stream') and is consistent with the design intent, but users should be aware that observer handlers should be lightweight to avoid latency.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Good call on the back-pressure coupling. Documented the synchronous delivery in cb420275 — reworded "without interfering" to "without changing each other's view" and added an explicit line about slow handlers back-pressuring the rest of the stream. Keeping rendezvous (default max_buffer_size=0) intentionally; adding a buffer would risk unbounded memory growth for pathologically slow observers, and events are low-frequency enough in practice that typical record/print observers aren't a concern.
HandleEventStream capabilityProcessEventStream capability
|
|
||
| @classmethod | ||
| def get_serialization_name(cls) -> str | None: | ||
| return None # Not spec-serializable (takes a callable) |
There was a problem hiding this comment.
ProcessEventStream doesn't override get_ordering(), so it defaults to None (no position constraint). The PR description says it's "Ordered 'outermost'", and the docstring says "multiple handlers … can all see the same stream without changing each other's view" (implying it wraps outside other capabilities), but this is only true if the user happens to list ProcessEventStream before other capabilities in the list — the topological sort doesn't guarantee it.
You should add a get_ordering() override returning CapabilityOrdering(position='outermost') to match the stated semantics, similar to how _ToolSearchCapability does it. The test test_handler_sees_events_after_inner_wrappers currently passes by coincidence of list order and would break if the user swapped the capability order.
There was a problem hiding this comment.
Good catch on the mismatch — but I think the right fix is to drop the "outermost" claim, not enforce it. List-position ordering is the natural contract for user-controlled capabilities: CombinedCapability.wrap_run_event_stream wraps in reversed(self.capabilities) order (list-earlier = outermost), and sort_capabilities preserves insertion order as a tiebreaker when no explicit constraints apply. HistoryProcessor doesn't force a position either; only _ToolSearchCapability does, and it's a special internal case. Forcing outermost here would remove the user's ability to stack a ProcessEventStream (say, for logging raw events) inside another capability that transforms the stream. Updated the PR description to remove the outermost claim and renamed the test intent to reflect that the ordering follows list position.
| 'BuiltinTool', | ||
| 'BuiltinOrLocalTool', | ||
| 'CAPABILITY_TYPES', | ||
| 'ProcessEventStream', |
There was a problem hiding this comment.
The concrete capabilities section (from ImageGeneration through WebSearch) is alphabetically ordered. ProcessEventStream should be placed between PrepareTools and ReinjectSystemPrompt to maintain this.
There was a problem hiding this comment.
Fixed in 9d4b45cb — moved ProcessEventStream between PrepareTools and ReinjectSystemPrompt.
|
The PR description has |
Checklist
Summary
Introduces
ProcessEventStream, a capability wrapping a user-provided event stream handler so it can participate in the capability middleware chain. Its position in the capability list determines nesting order, same as other hook-overriding capabilities — list earlier = wraps outside.Supports two handler forms:
EventStreamHandler][pydantic_ai.agent.EventStreamHandler]) —async defreturningNone. Events are teed to the handler while also flowing through to the rest of the chain. Synchronous delivery, so a slow observer back-pressures the downstream stream.EventStreamProcessor][pydantic_ai.agent.EventStreamProcessor]) — async generator yielding events. Its output replaces the inner stream for downstream wrappers, so it can modify, drop, or add events.The existing
event_stream_handlerparam onAgent/Agent.runis untouched — both approaches coexist.abstract.py's existinghas_wrap_run_event_streamcheck already auto-enables streaming when any capability overrides the hook, so no changes were needed there.Durable-execution integrations (Temporal/DBOS/Prefect) consume model events inside the activity/step boundary, so this capability only sees tool-call events and the post-streaming batch under those wrappers; the live event path remains available via the
event_stream_handlerparam. Threading the capability chain through the activity boundary is being explored in #4977.Test plan
TestProcessEventStream::test_handler_receives_events— handler registered via capability receives events end-to-endTestProcessEventStream::test_multiple_handlers_and_param_all_observe— multiple capabilities and the param handler all see the same streamTestProcessEventStream::test_handler_sees_events_after_inner_wrappers— when listed before an inner wrapper, the handler sees the transformed streamTestProcessEventStream::test_transformer_handler_replaces_stream— processor form modifies the stream for downstream consumersTestProcessEventStream::test_callable_instance_processor— callable-class processors dispatch correctlyTestProcessEventStream::test_observer_bailout_does_not_break_downstream— observer early-return doesn't crash downstreamTestProcessEventStream::test_not_spec_serializable— capability opts out of spec construction (takes a callable)tests/test_capabilities.pysuite,tests/test_streaming.py,tests/test_agent.pyall pass