Skip to content

Add ProcessEventStream capability#5141

Merged
DouweM merged 18 commits intomainfrom
event-stream-handler-cap
Apr 24, 2026
Merged

Add ProcessEventStream capability#5141
DouweM merged 18 commits intomainfrom
event-stream-handler-cap

Conversation

@DouweM
Copy link
Copy Markdown
Collaborator

@DouweM DouweM commented Apr 17, 2026

  • Closes #

Checklist

  • Any AI generated code has been reviewed line-by-line by the human PR author, who stands by it.
  • No breaking changes in accordance with the version policy.
  • PR title is fit for the release changelog.

Summary

Introduces ProcessEventStream, a capability wrapping a user-provided event stream handler so it can participate in the capability middleware chain. Its position in the capability list determines nesting order, same as other hook-overriding capabilities — list earlier = wraps outside.

Supports two handler forms:

  • Observer ([EventStreamHandler][pydantic_ai.agent.EventStreamHandler]) — async def returning None. Events are teed to the handler while also flowing through to the rest of the chain. Synchronous delivery, so a slow observer back-pressures the downstream stream.
  • Processor ([EventStreamProcessor][pydantic_ai.agent.EventStreamProcessor]) — async generator yielding events. Its output replaces the inner stream for downstream wrappers, so it can modify, drop, or add events.

The existing event_stream_handler param on Agent / Agent.run is untouched — both approaches coexist. abstract.py's existing has_wrap_run_event_stream check already auto-enables streaming when any capability overrides the hook, so no changes were needed there.

Durable-execution integrations (Temporal/DBOS/Prefect) consume model events inside the activity/step boundary, so this capability only sees tool-call events and the post-streaming batch under those wrappers; the live event path remains available via the event_stream_handler param. Threading the capability chain through the activity boundary is being explored in #4977.

Test plan

  • TestProcessEventStream::test_handler_receives_events — handler registered via capability receives events end-to-end
  • TestProcessEventStream::test_multiple_handlers_and_param_all_observe — multiple capabilities and the param handler all see the same stream
  • TestProcessEventStream::test_handler_sees_events_after_inner_wrappers — when listed before an inner wrapper, the handler sees the transformed stream
  • TestProcessEventStream::test_transformer_handler_replaces_stream — processor form modifies the stream for downstream consumers
  • TestProcessEventStream::test_callable_instance_processor — callable-class processors dispatch correctly
  • TestProcessEventStream::test_observer_bailout_does_not_break_downstream — observer early-return doesn't crash downstream
  • TestProcessEventStream::test_not_spec_serializable — capability opts out of spec construction (takes a callable)
  • Full tests/test_capabilities.py suite, tests/test_streaming.py, tests/test_agent.py all pass

Wraps a user-provided event stream handler as a capability, allowing it
to participate in the capability middleware chain instead of requiring
the dedicated `event_stream_handler` argument on `Agent`/`Agent.run`.
Ordered 'outermost' so the handler sees events after all other
`wrap_run_event_stream` wrappers have transformed the stream.

Both approaches coexist: the existing `event_stream_handler` params
remain unchanged.
@github-actions github-actions Bot added size: S Small PR (≤100 weighted lines) feature New feature request, or PR implementing a feature (enhancement) labels Apr 17, 2026
Comment on lines +18 to +29
"""A capability that consumes the agent's event stream via a user-provided async handler.

The handler receives the stream of [`AgentStreamEvent`][pydantic_ai.messages.AgentStreamEvent]s
emitted during model streaming and tool execution for each
[`ModelRequestNode`][pydantic_ai._agent_graph.ModelRequestNode] and
[`CallToolsNode`][pydantic_ai._agent_graph.CallToolsNode].

When this capability is registered, [`agent.run()`][pydantic_ai.Agent.run] automatically
enables streaming so the handler fires without requiring an explicit `event_stream_handler`
argument. The capability is ordered `'outermost'` so the handler sees events after all
other [`wrap_run_event_stream`][pydantic_ai.capabilities.AbstractCapability.wrap_run_event_stream]
wrappers have transformed the stream.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring mentions that the capability works "without requiring an explicit event_stream_handler argument," but it doesn't warn about what happens if both are set. Looking at abstract.py:309-326, when both are present, HandleEventStream.wrap_run_event_stream consumes the stream as the outermost wrapper, and the event_stream_handler param handler receives an empty iterable.

Worth adding a note that this capability is mutually exclusive with the event_stream_handler parameter, or alternatively adding a runtime guard (e.g. in for_run) that raises when both are configured.

Comment on lines +22 to +23
[`ModelRequestNode`][pydantic_ai._agent_graph.ModelRequestNode] and
[`CallToolsNode`][pydantic_ai._agent_graph.CallToolsNode].
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mkdocstrings cross-references here point to private module pydantic_ai._agent_graph — these won't render as working links in the docs and expose internal architecture. Consider either removing them, or referencing the public-facing concepts (e.g. "model request and tool execution nodes") without linking to private classes.

devin-ai-integration[bot]

This comment was marked as resolved.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 17, 2026

- Switch HandleEventStream to a tee: forward events downstream while also
  feeding the handler via an internal memory object stream. Multiple
  capabilities (and the explicit `event_stream_handler` param) can now
  observe the same stream without shadowing each other.
- Drop `outermost` ordering — no longer needed with the tee design.
- Drop the private `_agent_graph` module links from the docstring.
- Add HandleEventStream row to the capabilities.md table.
devin-ai-integration[bot]

This comment was marked as resolved.

The `EventStreamHandler` type alias now accepts either an observer (async def
returning None) or a transformer (async generator yielding AgentStreamEvents).
Both forms are supported by `HandleEventStream` and the top-level
`event_stream_handler` parameter:

- Observer: the capability tees events to the handler while passing them
  through unchanged to the rest of the chain.
- Transformer: the capability iterates the handler's output and forwards
  those events to the chain, letting the handler modify, drop, or add events.

At the `event_stream_handler` parameter level the handler is the terminal
consumer, so observer and transformer are equivalent (both iterated to
completion via a shared `_consume_event_stream_handler` helper).
@github-actions github-actions Bot added size: M Medium PR (101-500 weighted lines) and removed size: S Small PR (≤100 weighted lines) labels Apr 17, 2026
devin-ai-integration[bot]

This comment was marked as resolved.

All three durable-exec backends (prefect, temporal, dbos) call the user's
event stream handler directly in their wrapper models/agents. Switch those
sites to the shared `consume_event_stream_handler` helper in
`agent/abstract.py` so both observer and transformer handler forms work.
devin-ai-integration[bot]

This comment was marked as resolved.

DouweM added 2 commits April 17, 2026 23:26
…els to capability chain

- Revert `EventStreamHandler` to its original observer-only signature
  (`Awaitable[None]`). The param-level handler is a terminal consumer where
  a transformer return has nowhere to go.
- Introduce `EventStreamProcessor` (async generator yielding events) as a
  sibling type used only where transformation is meaningful.
- `HandleEventStream.handler` accepts either; it dispatches via
  `inspect.isasyncgenfunction` — tee for observers, iterate-and-yield for
  processors. Name follows the `HistoryProcessor` convention already used
  for the history-processing capability.
- Remove the cross-module `consume_event_stream_handler` helper; call sites
  just `await handler(...)` again.

Also thread the owning agent through each durable-exec model wrapper
(`TemporalModel` already had it; `DBOSModel` and `PrefectModel` now do),
and route the streamed response through `agent.root_capability.wrap_run_event_stream`
inside the activity/step before passing it to the handler. With no capability
overriding the hook this is a transparent pass-through, so existing event
stream behavior and span output are unchanged; `HandleEventStream` (and any
other capability overriding `wrap_run_event_stream`) now fires inside the
durable activity rather than being silently ignored.

Expose `root_capability` on `AbstractAgent` / `WrapperAgent` so wrapper
models can reach it polymorphically.
- Extend test_wrapper_agent to check root_capability delegates.
- Add HandleEventStream with a no-op observer handler to the shared
  complex_agent fixtures in test_dbos.py and test_temporal.py. Since the
  capability just tees events, the param-level event_stream_handler
  continues to see the same event sequence (snapshots unchanged), and the
  activity-level wrap_run_event_stream routing now gets exercised
  realistically.
devin-ai-integration[bot]

This comment was marked as resolved.

…le-invoke

- Extract a `run_event_stream_through_capabilities` helper in
  `agent/abstract.py` and call it from all three durable-exec model
  wrappers (DBOS/Prefect/Temporal). Replaces the gated inline code that
  was skipping capability wrapping whenever `event_stream_handler` was
  unset, so a user who registers only `HandleEventStream` on the agent
  now sees real model events inside the activity/step.
- `HandleEventStream.wrap_run_event_stream` now dispatches by inspecting
  the handler's return value (`isinstance(probe, AsyncIterator)`) rather
  than `inspect.isasyncgenfunction`, so callable-class processors whose
  `__call__` is an async generator work correctly too.
- Tee loop catches `BrokenResourceError`/`ClosedResourceError` so a
  handler that iterates once and returns no longer crashes the wrap —
  downstream consumers keep receiving events.
- In `Agent.run()`'s streaming loop, skip the outer
  `wrap_run_event_stream` pass when the stream is a
  `CompletedStreamedResponse` (the marker durable wrappers return after
  consuming events inside an activity/step). This avoids double-invoking
  the capability chain — once with real events inside the activity, again
  on an empty stream in the workflow loop.
- Tests: direct unit tests for the helper cover all branches
  (agent/no-agent × handler/no-handler); `HandleEventStream` tests cover
  callable-class processors and observer early-bailout.
devin-ai-integration[bot]

This comment was marked as resolved.

DouweM added 2 commits April 18, 2026 00:26
- Revert the `isinstance(stream, CompletedStreamedResponse)` guard in
  `Agent.run()`'s streaming loop: `n.stream()` yields an `AgentStream`
  wrapper, not the underlying `StreamedResponse`, so the guard was a
  no-op that never triggered. The double-invocation behavior around
  durable-exec's `CompletedStreamedResponse` is real but needs a
  different fix; leaving it for a followup.
- Simplify `run_event_stream_through_capabilities`: always iterate the
  (possibly capability-wrapped) stream when no handler is provided,
  since the helper is only invoked when at least one of them needs to
  see events. Drops the stale `elif wrapped is not stream` branch.
- Tighten the direct unit tests to match the simpler helper contract.
When a user registers `HandleEventStream` on a `TemporalAgent` without an
explicit `event_stream_handler`, `AbstractAgent.run()` still calls
`TemporalModel.request_stream` because a capability overrides
`wrap_run_event_stream`. The previous assertion rejected that scenario;
allow it through.
@pydantic pydantic deleted a comment from nidhishgajjar Apr 18, 2026
@dsfaccini dsfaccini changed the title Add HandleEventStream capability Add HandleEventStream capability Apr 18, 2026
Back out the approach where durable-exec model wrappers reached into
`self._agent.root_capability.wrap_run_event_stream(...)` to fire the
capability chain inside the activity/step. Reasons:

- Durable wrappers are documented as using only the public Agent API;
  reaching into `root_capability` crosses that line.
- The outer `AbstractAgent.run()` loop still wrapped the
  `CompletedStreamedResponse` returned from the durable call, so the
  handler fired twice per model request. A marker to suppress the
  second wrap is possible but wants more thought once PR #4977 (which
  ports durable exec to capabilities) lands.

The in-activity wrapping is removed from all three durable models and
the `run_event_stream_through_capabilities` helper is deleted.
`HandleEventStream`'s docstring now spells out the durable limitation
and points at the capability-port work. `event_stream_handler=` on the
agent continues to work inside durable runs exactly as before.

(`AbstractAgent.root_capability` abstract property and `WrapperAgent`'s
delegation stay — they're small and correct regardless of whether
durable code currently reaches for them.)
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 4 additional findings in Devin Review.

Open in Devin Review

Comment on lines +86 to +97
async with anyio.create_task_group() as tg:
tg.start_soon(run_handler)
async with send_stream:
handler_alive = True
async for event in stream:
if handler_alive:
try:
await send_stream.send(event)
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
# Handler bailed early; keep forwarding events downstream.
handler_alive = False
yield event
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Observer handler exceptions propagate through task group and crash the stream

If the observer handler raises an exception inside run_handler() (line 82-84), the anyio.create_task_group at line 86 will cancel the main loop and propagate the exception as an ExceptionGroup. This means a failing observer can crash the downstream stream — which somewhat contradicts the docstring's claim that observers pass events 'without interfering' (line 31-33). The test_observer_bailout_does_not_break_downstream test covers early-return but not exception-throwing. This is arguably the correct behavior (user errors should surface), but the docstring could be more explicit about error propagation semantics.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View 4 additional findings in Devin Review.

Open in Devin Review

Comment on lines +74 to +80
EventStreamProcessor: TypeAlias = Callable[
[RunContext[AgentDepsT], AsyncIterable[_messages.AgentStreamEvent]],
AsyncIterator[_messages.AgentStreamEvent],
]
"""An async generator that receives agent [`RunContext`][pydantic_ai.tools.RunContext] and an async iterable of events and yields a potentially modified stream.

Used with the [`HandleEventStream`][pydantic_ai.capabilities.HandleEventStream] capability to modify, drop, or add events visible to the rest of the capability chain."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 EventStreamProcessor type alias not exported from pydantic_ai.agent

The new EventStreamProcessor type alias is defined at pydantic_ai_slim/pydantic_ai/agent/abstract.py:74 but is not added to pydantic_ai/agent/__init__.py's imports or __all__, unlike the existing EventStreamHandler which IS exported (line 89, 113 of agent/__init__.py). The HandleEventStream docstring references [pydantic_ai.agent.EventStreamProcessor] which would be a broken mkdocs link since the symbol isn't accessible at pydantic_ai.agent. Users wanting to annotate processor handlers would need to import from pydantic_ai.agent.abstract instead of pydantic_ai.agent.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


if not yielded:
raise exceptions.AgentRunError('Agent run finished without producing a final result') # pragma: no cover
raise exceptions.AgentRunError('Agent run finished without producing a final result')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Removal of pragma: no cover on unreachable line

The # pragma: no cover was removed from the raise AgentRunError('Agent run finished without producing a final result') at line 766 of abstract.py. Analysis of the run_stream control flow shows this line remains unreachable: the while not yielded loop can only exit via break (which always sets yielded = True first) or exception. The removal aligns with the coding guidelines (rule:97: 'Avoid # pragma: no cover'), but without a test covering this line, CI coverage checks may flag it. This is a coverage annotation concern rather than a correctness issue.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

DouweM added 3 commits April 24, 2026 17:22
- Re-export `EventStreamProcessor` from `pydantic_ai.agent` alongside
  `EventStreamHandler`. Fixes the mkdocs cross-reference in the
  `HandleEventStream` docstring and lets users annotate processor
  handlers with the canonical import path.
- Spell out in the docstring what happens when an observer handler
  returns early (stops receiving events, downstream unaffected) vs.
  raises (exception propagates to the rest of the run).
After merging main, line 814 is now flagged as missing from coverage.
This branch can't be reached from normal test paths: the surrounding
`while not yielded:` loop only exits when `yielded=True` or via an
exception that propagates past the `if` entirely. The earlier
strict-no-cover flag that got me to remove this pragma was a false
positive; the merge seems to have resolved it.


@dataclass
class HandleEventStream(AbstractCapability[AgentDepsT]):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about calling this ProcessEventStream instead, since "handle" underplays its potential side effects? And maybe capabilities.HistoryProcessor should become ProcessHistory in v2 as well.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to ProcessEventStream in 869e257a. Filed a tracker plan for HistoryProcessorProcessHistory at ~/.claude/plans/history-processor-rename.md — will pick that up as a separate PR so this one stays focused.

durable model consumes those inside the activity before returning. The in-flight
`event_stream_handler` parameter does still observe the live events, and a future
refactor threading the capability chain through the activity boundary is tracked
separately.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked where? We should link to #4977.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linked to #4977 in 869e257a.


@property
@abstractmethod
def root_capability(self) -> CombinedCapability[AgentDepsT]:
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this require all subclasses to implement this now, even existing ones out in the wild? That'd be a breaking change. I'd rather have it just raise in that case, not block the subclass creation.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Dropped @abstractmethod in 869e257a; the default raises NotImplementedError on access but no longer blocks subclass instantiation.

DouweM added 2 commits April 24, 2026 12:03
…ink PR #4977, drop abstractmethod on root_capability

- Rename `HandleEventStream` to `ProcessEventStream` (and file `event_stream_handler.py` → `process_event_stream.py`). "Handle" underplayed the transformer form's side effects.
- Link the durable-execution note to PR #4977 so readers can follow the in-flight work.
- Drop `@abstractmethod` from `AbstractAgent.root_capability` — existing third-party subclasses shouldn't break; raising `NotImplementedError` on access is enough.
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 6 additional findings in Devin Review.

Open in Devin Review

Comment on lines +82 to +99
send_stream, receive_stream = anyio.create_memory_object_stream[AgentStreamEvent]()

async def run_handler() -> None:
async with receive_stream:
await observer(ctx, receive_stream)

async with anyio.create_task_group() as tg:
tg.start_soon(run_handler)
async with send_stream:
handler_alive = True
async for event in stream:
if handler_alive:
try:
await send_stream.send(event)
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
# Handler bailed early; keep forwarding events downstream.
handler_alive = False
yield event
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Info: Observer path uses zero-buffer memory stream creating synchronous backpressure

The observer path at pydantic_ai_slim/pydantic_ai/capabilities/process_event_stream.py:83-100 uses anyio.create_memory_object_stream[AgentStreamEvent]() with the default buffer size of 0 (rendezvous channel). This means send_stream.send(event) blocks until the observer calls receive(). Combined with the sequential yield event after the send, this creates a pipeline where each event is first delivered to the observer, then yielded downstream. A slow observer will back-pressure the entire event stream pipeline. This is documented in the docstring ('Events are delivered synchronously, so a slow handler back-pressures the rest of the stream') and is consistent with the design intent, but users should be aware that observer handlers should be lightweight to avoid latency.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call on the back-pressure coupling. Documented the synchronous delivery in cb420275 — reworded "without interfering" to "without changing each other's view" and added an explicit line about slow handlers back-pressuring the rest of the stream. Keeping rendezvous (default max_buffer_size=0) intentionally; adding a buffer would risk unbounded memory growth for pathologically slow observers, and events are low-frequency enough in practice that typical record/print observers aren't a concern.

@DouweM DouweM changed the title Add HandleEventStream capability Add ProcessEventStream capability Apr 24, 2026

@classmethod
def get_serialization_name(cls) -> str | None:
return None # Not spec-serializable (takes a callable)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessEventStream doesn't override get_ordering(), so it defaults to None (no position constraint). The PR description says it's "Ordered 'outermost'", and the docstring says "multiple handlers … can all see the same stream without changing each other's view" (implying it wraps outside other capabilities), but this is only true if the user happens to list ProcessEventStream before other capabilities in the list — the topological sort doesn't guarantee it.

You should add a get_ordering() override returning CapabilityOrdering(position='outermost') to match the stated semantics, similar to how _ToolSearchCapability does it. The test test_handler_sees_events_after_inner_wrappers currently passes by coincidence of list order and would break if the user swapped the capability order.

@DouweM

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the mismatch — but I think the right fix is to drop the "outermost" claim, not enforce it. List-position ordering is the natural contract for user-controlled capabilities: CombinedCapability.wrap_run_event_stream wraps in reversed(self.capabilities) order (list-earlier = outermost), and sort_capabilities preserves insertion order as a tiebreaker when no explicit constraints apply. HistoryProcessor doesn't force a position either; only _ToolSearchCapability does, and it's a special internal case. Forcing outermost here would remove the user's ability to stack a ProcessEventStream (say, for logging raw events) inside another capability that transforms the stream. Updated the PR description to remove the outermost claim and renamed the test intent to reflect that the ordering follows list position.

'BuiltinTool',
'BuiltinOrLocalTool',
'CAPABILITY_TYPES',
'ProcessEventStream',
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concrete capabilities section (from ImageGeneration through WebSearch) is alphabetically ordered. ProcessEventStream should be placed between PrepareTools and ReinjectSystemPrompt to maintain this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9d4b45cb — moved ProcessEventStream between PrepareTools and ReinjectSystemPrompt.

@github-actions
Copy link
Copy Markdown
Contributor

The PR description has Closes # with no issue number. Is there a tracking issue for this feature, or should one be filed?

@DouweM DouweM merged commit 4071cdb into main Apr 24, 2026
60 checks passed
@DouweM DouweM deleted the event-stream-handler-cap branch April 24, 2026 18:49
Alex-Resch pushed a commit to Alex-Resch/pydantic-ai that referenced this pull request Apr 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New feature request, or PR implementing a feature (enhancement) size: M Medium PR (101-500 weighted lines)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant