0% found this document useful (0 votes)
1K views26 pages

Pydantic Ai Implementation Guide

Uploaded by

lionelnumtema
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
1K views26 pages

Pydantic Ai Implementation Guide

Uploaded by

lionelnumtema
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 26

Pydantic AI Implementation Guide

Table of Contents
1. Introduction
2. Quick Start Guide
3. Setting Up Your Environment
4. Defining Agents
5. Creating and Registering Tools
6. Dependency Injection
7. MCP Server Setup
8. Agent Types and Use Cases
9. End-to-End Example
10. Testing and Debugging
11. Deployment Considerations
12. Appendix: Advanced Patterns

Introduction
This implementation guide provides practical instructions and code examples for
building an AI agent platform using Pydantic AI. It covers everything from basic agent
setup to complex workflows and deployment strategies. The guide is designed to help
developers implement the architecture described in the platform architecture
document while following the best practices outlined in the best practices guide.

Quick Start Guide


Here’s a checklist to get your Pydantic AI agent platform up and running quickly:

1. Install Pydantic AI and dependencies


2. Define your agent types and their capabilities
3. Create tools and register them with your agents
4. Set up dependency injection for external services
5. Configure the MCP server for agent communication
6. Implement monitoring and observability
7. Deploy your platform

Let’s start with a simple example:

# Install Pydantic AI
# pip install pydantic-ai

from pydantic_ai import Agent


from pydantic import BaseModel, Field

# Define a simple agent


agent = Agent(
'openai:gpt-4',
system_prompt="You are a helpful assistant that provides concise answers."
)

# Run the agent


response = agent.run_sync("What is Pydantic AI?")
print(response.output)

Setting Up Your Environment


Installation
# Create a virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate

# Install Pydantic AI and dependencies


pip install pydantic-ai
pip install fastapi uvicorn # For MCP server
pip install pytest pytest-asyncio # For testing

Project Structure

A typical Pydantic AI platform project might be structured as follows:

my_ai_platform/
├── agents/
│ ├── __init__.py
│ ├── base.py
│ ├── retrieval.py
│ ├── planning.py
│ └── conversation.py
├── tools/
│ ├── __init__.py
│ ├── registry.py
│ ├── search.py
│ ├── database.py
│ └── external_apis.py
├── services/
│ ├── __init__.py
│ ├── database.py
│ ├── vector_store.py
│ └── authentication.py
├── models/
│ ├── __init__.py
│ ├── input_types.py
│ └── output_types.py
├── mcp/
│ ├── __init__.py
│ ├── server.py
│ └── client.py
├── config/
│ ├── __init__.py
│ ├── settings.py
│ └── logging.py
├── tests/
│ ├── __init__.py
│ ├── test_agents.py
│ └── test_tools.py
├── main.py
├── Dockerfile
└── docker-compose.yml

Defining Agents
Agents are the core building blocks of your AI platform. Here’s how to define different
types of agents:

Basic Agent
from pydantic_ai import Agent

# Simple agent with a static system prompt


basic_agent = Agent(
'openai:gpt-4',
system_prompt="You are a helpful assistant that provides concise answers."
)

Agent with Structured Output


from pydantic_ai import Agent
from pydantic import BaseModel, Field
from enum import Enum

class Sentiment(Enum):
POSITIVE = "positive"
NEUTRAL = "neutral"
NEGATIVE = "negative"

class SentimentAnalysis(BaseModel):
sentiment: Sentiment
confidence: float = Field(ge=0.0, le=1.0)
explanation: str

# Agent that returns structured output


sentiment_agent = Agent(
'anthropic:claude-3-opus-20240229',
system_prompt="Analyze the sentiment of the given text.",
output_type=SentimentAnalysis
)

result = sentiment_agent.run_sync("I absolutely love this product! It's amazing!")


print(f"Sentiment: {result.data.sentiment}")
print(f"Confidence: {result.data.confidence}")
print(f"Explanation: {result.data.explanation}")

Agent with Dynamic System Prompt


from pydantic_ai import Agent, RunContext
from dataclasses import dataclass

@dataclass
class UserContext:
user_name: str
user_preferences: dict
language: str = "en"

# Agent with dynamic system prompt based on user context


personalized_agent = Agent(
'google-gla:gemini-1.5-pro',
deps_type=UserContext
)

@personalized_agent.system_prompt
def personalized_prompt(ctx: RunContext[UserContext]) -> str:
return f"""
You are a personal assistant for {ctx.deps.user_name}.
Preferred language: {ctx.deps.language}
User preferences: {ctx.deps.user_preferences}

Always tailor your responses to match these preferences.


"""

# Using the agent with specific user context


user_context = UserContext(
user_name="Alice",
user_preferences={"tone": "casual", "detail_level": "high"},
language="en"
)

response = personalized_agent.run_sync(
"What restaurants would you recommend?",
deps=user_context
)

Async Agent
from pydantic_ai import Agent
import asyncio

async_agent = Agent(
'openai:gpt-4',
system_prompt="You are a helpful assistant."
)

async def process_query(query: str):


result = await async_agent.run(query)
return result.output

# Run multiple queries concurrently


async def process_multiple_queries(queries: list[str]):
tasks = [process_query(query) for query in queries]
results = await asyncio.gather(*tasks)
return results

# Example usage
queries = ["What is AI?", "Explain quantum computing", "How does blockchain work?"]
results = asyncio.run(process_multiple_queries(queries))

Stateful Agent
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class ConversationState:
history: List[dict]
user_id: str
last_topic: Optional[str] = None

# Stateful agent that maintains conversation history


stateful_agent = Agent(
'anthropic:claude-3-sonnet-20240229',
deps_type=ConversationState
)

@stateful_agent.system_prompt
def conversation_prompt(ctx: RunContext[ConversationState]) -> str:
history_str = "\n".join([
f"User: {msg['user']}\nAssistant: {msg['assistant']}"
for msg in ctx.deps.history[-5:] # Last 5 messages
])

return f"""
You are a conversational assistant for user {ctx.deps.user_id}.

Recent conversation history:


{history_str}

Last topic discussed: {ctx.deps.last_topic or 'None'}

Maintain context from the conversation history.


"""

# Example usage
state = ConversationState(
history=[
{"user": "Hello", "assistant": "Hi there! How can I help you today?"},
{"user": "Tell me about machine learning", "assistant": "Machine learning is..."}
],
user_id="user123",
last_topic="machine learning"
)

response = stateful_agent.run_sync("How is it different from deep learning?", deps=state)

# Update state with new interaction


state.history.append({
"user": "How is it different from deep learning?",
"assistant": response.output
})
state.last_topic = "deep learning vs machine learning"

Creating and Registering Tools


Tools allow agents to interact with external systems and perform specific functions.
Here’s how to create and register tools:

Basic Tool Registration


from pydantic_ai import Agent

weather_agent = Agent(
'openai:gpt-4',
system_prompt="You provide weather information."
)

@weather_agent.tool_plain
def get_current_temperature(city: str) -> float:
"""
Get the current temperature in Celsius for the specified city.

Args:
city: The name of the city to get the temperature for

Returns:
The current temperature in Celsius
"""
# In a real implementation, this would call a weather API
# This is a mock implementation for demonstration
mock_temperatures = {
"new york": 22.5,
"london": 18.0,
"tokyo": 26.3,
"sydney": 20.1
}
return mock_temperatures.get(city.lower(), 20.0) # Default to 20°C if city not found

Tool with Context


from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
import aiohttp

@dataclass
class APICredentials:
api_key: str
base_url: str

search_agent = Agent(
'anthropic:claude-3-haiku-20240307',
deps_type=APICredentials,
system_prompt="You help users find information online."
)

@search_agent.tool
async def search_web(ctx: RunContext[APICredentials], query: str, max_results: int = 5) ->
list[dict]:
"""
Search the web for information related to the query.

Args:
query: The search query
max_results: Maximum number of results to return (default: 5)

Returns:
A list of search results, each containing a title, URL, and snippet
"""
headers = {"Authorization": f"Bearer {ctx.deps.api_key}"}
params = {"q": query, "limit": max_results}

async with aiohttp.ClientSession() as session:


async with session.get(f"{ctx.deps.base_url}/search", headers=headers,
params=params) as response:
if response.status == 200:
data = await response.json()
return data.get("results", [])
else:
return []

Tool Registry

For larger applications, it’s helpful to organize tools in a central registry:

# tools/registry.py
from typing import Dict, Callable, Any, List
from pydantic_ai import Agent

class ToolRegistry:
def __init__(self):
self._tools: Dict[str, Dict[str, Callable]] = {}

def register_tool(self, category: str, name: str, tool_func: Callable) -> None:
"""Register a tool function under a specific category."""
if category not in self._tools:
self._tools[category] = {}

self._tools[category][name] = tool_func

def get_tool(self, category: str, name: str) -> Callable:


"""Get a specific tool by category and name."""
return self._tools.get(category, {}).get(name)

def get_category_tools(self, category: str) -> Dict[str, Callable]:


"""Get all tools in a category."""
return self._tools.get(category, {})

def get_all_tools(self) -> Dict[str, Dict[str, Callable]]:


"""Get all registered tools."""
return self._tools

def register_with_agent(self, agent: Agent, category: str = None) -> None:


"""Register all tools (or tools from a specific category) with an agent."""
if category:
tools = self.get_category_tools(category)
for name, tool_func in tools.items():
agent.add_tool(tool_func)
else:
for category in self._tools:
for name, tool_func in self._tools[category].items():
agent.add_tool(tool_func)

# Example usage
registry = ToolRegistry()

# Register tools
def search_wikipedia(query: str) -> List[dict]:
"""Search Wikipedia for information."""
# Implementation
return []

def search_news(query: str, days: int = 7) -> List[dict]:


"""Search recent news articles."""
# Implementation
return []

registry.register_tool("search", "wikipedia", search_wikipedia)


registry.register_tool("search", "news", search_news)

# Create agent and register tools


agent = Agent('openai:gpt-4', system_prompt="You help find information.")
registry.register_with_agent(agent, category="search")

Dependency Injection
Dependency injection allows you to provide external services and data to your agents
and tools:

Basic Dependency Injection


from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class DatabaseService:
connection_string: str

async def query(self, sql: str) -> List[Dict[str, Any]]:


# In a real implementation, this would connect to a database
# This is a mock implementation for demonstration
if "users" in sql.lower():
return [
{"id": 1, "name": "Alice", "email": "[email protected]"},
{"id": 2, "name": "Bob", "email": "[email protected]"}
]
return []

@dataclass
class AppDependencies:
db: DatabaseService
user_id: int
api_key: str

# Create agent with dependencies


data_agent = Agent(
'openai:gpt-4',
deps_type=AppDependencies,
system_prompt="You help users query data from the database."
)

@data_agent.tool
async def query_database(ctx: RunContext[AppDependencies], sql: str) -> List[Dict[str,
Any]]:
"""
Execute an SQL query against the database.

Args:
sql: The SQL query to execute

Returns:
The query results as a list of dictionaries
"""
# Add security check to prevent SQL injection
if "DROP" in sql.upper() or "DELETE" in sql.upper():
raise ValueError("Destructive SQL operations are not allowed")

# Log the query with user ID for audit purposes


print(f"User {ctx.deps.user_id} executed query: {sql}")

# Execute the query


return await ctx.deps.db.query(sql)

# Example usage
db_service = DatabaseService(connection_string="postgresql://user:pass@localhost/db")
dependencies = AppDependencies(
db=db_service,
user_id=123,
api_key="sk-api-key"
)

result = data_agent.run_sync(
"Find all users in the database",
deps=dependencies
)

Service Layer

For more complex applications, you might want to create a service layer:

# services/database.py
import asyncpg
from typing import List, Dict, Any, Optional

class DatabaseService:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None

async def initialize(self):


"""Initialize the connection pool."""
self.pool = await asyncpg.create_pool(self.connection_string)

async def close(self):


"""Close the connection pool."""
if self.pool:
await self.pool.close()

async def query(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str,
Any]]:
"""Execute a query and return results as dictionaries."""
if not self.pool:
await self.initialize()

async with self.pool.acquire() as conn:


stmt = await conn.prepare(sql)
records = await stmt.fetch(*(params or []))
return [dict(record) for record in records]

async def execute(self, sql: str, params: Optional[List[Any]] = None) -> str:
"""Execute a command and return status."""
if not self.pool:
await self.initialize()

async with self.pool.acquire() as conn:


status = await conn.execute(sql, *(params or []))
return status

# services/vector_store.py
import numpy as np
from typing import List, Dict, Any, Optional

class VectorStoreService:
def __init__(self, connection_string: str):
self.connection_string = connection_string
# Initialize your vector database client here

async def search(self, query_vector: List[float], top_k: int = 5) -> List[Dict[str,


Any]]:
"""Search for similar vectors."""
# In a real implementation, this would query a vector database
# This is a mock implementation for demonstration
return [
{"id": 1, "content": "Sample document 1", "score": 0.92},
{"id": 2, "content": "Sample document 2", "score": 0.85}
]

async def insert(self, id: str, vector: List[float], metadata: Dict[str, Any]) ->
bool:
"""Insert a vector with metadata."""
# Implementation
return True

# services/service_container.py
from dataclasses import dataclass
from .database import DatabaseService
from .vector_store import VectorStoreService

@dataclass
class ServiceContainer:
db: DatabaseService
vector_store: VectorStoreService

@classmethod
def create_from_config(cls, config: dict):
"""Create a service container from configuration."""
db = DatabaseService(config["database_url"])
vector_store = VectorStoreService(config["vector_store_url"])
return cls(db=db, vector_store=vector_store)

async def initialize(self):


"""Initialize all services."""
await self.db.initialize()
# Initialize other services as needed

async def close(self):


"""Close all services."""
await self.db.close()
# Close other services as needed

MCP Server Setup


The Model Context Protocol (MCP) server provides a standardized way for agents to
communicate with external services:

Basic MCP Server


# mcp/server.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
from typing import Dict, Any, List, Optional
import uvicorn

app = FastAPI(title="Pydantic AI MCP Server")

# Security
API_KEY = "your-secret-api-key" # In production, use environment variables
api_key_header = APIKeyHeader(name="X-API-Key")

def verify_api_key(api_key: str = Depends(api_key_header)):


if api_key != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API key")
return api_key

# Models
class MCPRequest(BaseModel):
agent_id: str
function_name: str
parameters: Dict[str, Any]
context: Optional[Dict[str, Any]] = None

class MCPResponse(BaseModel):
status: str
data: Any
error: Optional[str] = None

# Routes
@app.post("/invoke", response_model=MCPResponse, dependencies=[Depends(verify_api_key)])
async def invoke_function(request: MCPRequest):
try:
# In a real implementation, this would dispatch to the appropriate function
# This is a mock implementation for demonstration
if request.function_name == "get_weather":
# Mock weather data
return MCPResponse(
status="success",
data={"temperature": 22.5, "condition": "sunny"}
)
elif request.function_name == "search_database":
# Mock database search
return MCPResponse(
status="success",
data=[{"id": 1, "name": "Example result"}]
)
else:
return MCPResponse(
status="error",
data=None,
error=f"Unknown function: {request.function_name}"
)
except Exception as e:
return MCPResponse(
status="error",
data=None,
error=str(e)
)

@app.get("/health")
async def health_check():
return {"status": "healthy"}

# Run the server


if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Docker Compose Setup
# docker-compose.yml
version: '3'

services:
mcp-server:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- API_KEY=${API_KEY}
- DATABASE_URL=${DATABASE_URL}
- VECTOR_STORE_URL=${VECTOR_STORE_URL}
volumes:
- ./logs:/app/logs
restart: unless-stopped
depends_on:
- postgres
- redis

postgres:
image: postgres:14
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
volumes:
- postgres-data:/var/lib/postgresql/data
ports:
- "5432:5432"

redis:
image: redis:7
ports:
- "6379:6379"
volumes:
- redis-data:/data

volumes:
postgres-data:
redis-data:

# Dockerfile
FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "mcp/server.py"]

Agent Types and Use Cases


Here are examples of different agent types and their implementations:

Retrieval Agent
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class VectorDBDeps:
collection_name: str
api_key: str

retrieval_agent = Agent(
'openai:gpt-4',
deps_type=VectorDBDeps,
system_prompt="""
You are a retrieval agent that helps users find relevant information.
Use the search_documents tool to find information related to the user's query.
Always cite your sources by including the document ID.
"""
)

@retrieval_agent.tool
async def search_documents(ctx: RunContext[VectorDBDeps], query: str, top_k: int = 3) ->
List[Dict[str, Any]]:
"""
Search for documents related to the query.

Args:
query: The search query
top_k: Maximum number of results to return (default: 3)

Returns:
A list of documents, each containing an ID, content, and relevance score
"""
# In a real implementation, this would query a vector database
# This is a mock implementation for demonstration
return [
{"id": "doc1", "content": "Information about topic A", "score": 0.92},
{"id": "doc2", "content": "Information about topic B", "score": 0.85},
{"id": "doc3", "content": "Information about topic C", "score": 0.78}
]

# Example usage
deps = VectorDBDeps(collection_name="knowledge_base", api_key="api-key")
result = retrieval_agent.run_sync(
"What information do we have about topic A?",
deps=deps
)

Planning Agent
from pydantic_ai import Agent
from pydantic import BaseModel, Field
from typing import List
from enum import Enum

class TaskStatus(Enum):
TODO = "todo"
IN_PROGRESS = "in_progress"
DONE = "done"

class Task(BaseModel):
description: str
status: TaskStatus = TaskStatus.TODO
dependencies: List[int] = Field(default_factory=list)

class Plan(BaseModel):
goal: str
tasks: List[Task]
estimated_completion_time: str
planning_agent = Agent(
'anthropic:claude-3-opus-20240229',
output_type=Plan,
system_prompt="""
You are a planning agent that helps users break down complex goals into actionable
tasks.
For each goal, create a detailed plan with:
1. A clear list of tasks
2. Dependencies between tasks (which tasks must be completed before others)
3. An estimated completion time

Be realistic and thorough in your planning.


"""
)

# Example usage
result = planning_agent.run_sync(
"I want to build a personal website to showcase my portfolio."
)

print(f"Goal: {result.data.goal}")
print(f"Estimated completion time: {result.data.estimated_completion_time}")
print("Tasks:")
for i, task in enumerate(result.data.tasks):
deps = f"Dependencies: {task.dependencies}" if task.dependencies else "No
dependencies"
print(f"{i+1}. {task.description} ({task.status.value}) - {deps}")

Conversational Agent
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Optional

@dataclass
class ConversationHistory:
messages: List[Dict[str, str]]
user_name: str
preferences: Dict[str, str]

conversation_agent = Agent(
'openai:gpt-4',
deps_type=ConversationHistory,
system_prompt="""
You are a conversational agent that maintains context throughout a conversation.
Be friendly, helpful, and personalized to the user's preferences.
"""
)

@conversation_agent.system_prompt
def dynamic_prompt(ctx: RunContext[ConversationHistory]) -> str:
history = "\n".join([
f"User: {msg['user']}\nAssistant: {msg['assistant']}"
for msg in ctx.deps.messages[-5:] # Last 5 messages
])

return f"""
You are chatting with {ctx.deps.user_name}.

User preferences:
{ctx.deps.preferences}

Recent conversation history:


{history}

Maintain context from the conversation history and respond in a way that aligns with
Maintain context from the conversation history and respond in a way that aligns with
the user's preferences.
"""

# Example usage
history = ConversationHistory(
messages=[
{"user": "Hi there!", "assistant": "Hello! How can I help you today?"},
{"user": "I'm looking for a good book to read.", "assistant": "What genres do you
enjoy?"},
{"user": "I like science fiction and fantasy.", "assistant": "Great choices! Some
recommendations..."}
],
user_name="Alex",
preferences={"tone": "casual", "response_length": "detailed"}
)

response = conversation_agent.run_sync(
"Any recommendations for books like Dune?",
deps=history
)

# Update conversation history


history.messages.append({
"user": "Any recommendations for books like Dune?",
"assistant": response.output
})

RAG (Retrieval-Augmented Generation) Agent


from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class RAGDependencies:
vector_db_url: str
api_key: str
user_id: str

rag_agent = Agent(
'openai:gpt-4',
deps_type=RAGDependencies,
system_prompt="""
You are a RAG (Retrieval-Augmented Generation) agent.

Process:
1. When a user asks a question, use the search_knowledge_base tool to retrieve
relevant information
2. Synthesize the retrieved information to provide a comprehensive answer
3. Always cite your sources by including document IDs
4. If the retrieved information is insufficient, acknowledge the limitations

Your goal is to provide accurate, well-sourced answers based on the retrieved


information.
"""
)

@rag_agent.tool
async def search_knowledge_base(ctx: RunContext[RAGDependencies], query: str, top_k: int =
5) -> List[Dict[str, Any]]:
"""
Search the knowledge base for information related to the query.

Args:
query: The search query
top_k: Maximum number of results to return (default: 5)
Returns:
A list of documents, each containing an ID, content, and relevance score
"""
# In a real implementation, this would query a vector database
# This is a mock implementation for demonstration
return [
{"id": "doc1", "content": "Detailed information about topic X...", "score": 0.95},
{"id": "doc2", "content": "Additional context about topic X...", "score": 0.87},
{"id": "doc3", "content": "Related information about topic Y...", "score": 0.82},
{"id": "doc4", "content": "Historical background on topic X...", "score": 0.78},
{"id": "doc5", "content": "Recent developments in topic X...", "score": 0.75}
]

# Example usage
deps = RAGDependencies(
vector_db_url="https://api.vectordb.example.com",
api_key="api-key",
user_id="user123"
)

result = rag_agent.run_sync(
"What is the latest information about topic X?",
deps=deps
)

Workflow Orchestration Agent


from pydantic_ai import Agent, RunContext
from pydantic import BaseModel, Field
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum

class WorkflowStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"

class WorkflowStep(BaseModel):
id: str
name: str
status: WorkflowStatus = WorkflowStatus.PENDING
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None

class Workflow(BaseModel):
id: str
name: str
steps: List[WorkflowStep]
current_step_index: int = 0
status: WorkflowStatus = WorkflowStatus.PENDING
context: Dict[str, Any] = Field(default_factory=dict)

@dataclass
class WorkflowDependencies:
workflow: Workflow
services: Dict[str, Any]

workflow_agent = Agent(
'openai:gpt-4',
deps_type=WorkflowDependencies,
system_prompt="""
You are a workflow orchestration agent responsible for managing multi-step processes.

Your responsibilities:
1. Execute the current step in the workflow
2. Update the workflow state based on the results
3. Determine the next step to execute
4. Handle errors and provide recovery options

Follow the workflow definition precisely and maintain the workflow context.
"""
)

@workflow_agent.tool
async def execute_step(ctx: RunContext[WorkflowDependencies]) -> Dict[str, Any]:
"""
Execute the current step in the workflow.

Returns:
The result of the step execution
"""
workflow = ctx.deps.workflow
current_step = workflow.steps[workflow.current_step_index]

try:
# Update step status
current_step.status = WorkflowStatus.IN_PROGRESS

# In a real implementation, this would dispatch to the appropriate service


# This is a mock implementation for demonstration
if current_step.id == "data_extraction":
result = {"extracted_data": {"key1": "value1", "key2": "value2"}}
elif current_step.id == "data_transformation":
# Use data from previous step
input_data = workflow.steps[workflow.current_step_index -
1].result["extracted_data"]
result = {"transformed_data": {"processed_key1": input_data["key1"].upper()}}
elif current_step.id == "data_loading":
result = {"loaded_records": 42}
else:
raise ValueError(f"Unknown step: {current_step.id}")

# Update step with result


current_step.result = result
current_step.status = WorkflowStatus.COMPLETED

# Update workflow context with step result


workflow.context[current_step.id] = result

return {"success": True, "step_id": current_step.id, "result": result}

except Exception as e:
current_step.status = WorkflowStatus.FAILED
current_step.error = str(e)
return {"success": False, "step_id": current_step.id, "error": str(e)}

@workflow_agent.tool
async def advance_workflow(ctx: RunContext[WorkflowDependencies]) -> Dict[str, Any]:
"""
Advance the workflow to the next step.

Returns:
Information about the next step
"""
workflow = ctx.deps.workflow

# Check if we're at the end of the workflow


if workflow.current_step_index >= len(workflow.steps) - 1:
workflow.status = WorkflowStatus.COMPLETED
return {"success": True, "workflow_completed": True}

# Advance to next step


workflow.current_step_index += 1
next_step = workflow.steps[workflow.current_step_index]

return {
"success": True,
"next_step_id": next_step.id,
"next_step_name": next_step.name
}

# Example usage
workflow = Workflow(
id="data_pipeline_1",
name="Data Processing Pipeline",
steps=[
WorkflowStep(id="data_extraction", name="Extract Data from Source"),
WorkflowStep(id="data_transformation", name="Transform Data"),
WorkflowStep(id="data_loading", name="Load Data to Destination")
]
)

deps = WorkflowDependencies(
workflow=workflow,
services={} # In a real implementation, this would contain service instances
)

# Run the workflow orchestration


async def run_workflow():
while workflow.status != WorkflowStatus.COMPLETED and workflow.status !=
WorkflowStatus.FAILED:
# Execute current step
result = await workflow_agent.run(
f"Execute step: {workflow.steps[workflow.current_step_index].name}",
deps=deps
)

# Check if we need to advance the workflow


if workflow.steps[workflow.current_step_index].status == WorkflowStatus.COMPLETED:
advance_result = await workflow_agent.run(
"Advance to next step",
deps=deps
)

if "workflow_completed" in advance_result.output:
print("Workflow completed successfully!")
break

return workflow

End-to-End Example
Here’s a complete end-to-end example of a simple customer support agent platform:

# main.py
import asyncio
import uvicorn
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from enum import Enum
from dataclasses import dataclass

from pydantic_ai import Agent, RunContext

# Models
class SupportCategory(Enum):
BILLING = "billing"
TECHNICAL = "technical"
ACCOUNT = "account"
OTHER = "other"

class SupportRequest(BaseModel):
customer_id: str
message: str

class SupportResponse(BaseModel):
response: str
category: SupportCategory
escalate: bool = False
follow_up_actions: List[str] = Field(default_factory=list)

# Services
@dataclass
class CustomerDatabase:
async def get_customer(self, customer_id: str) -> Dict[str, Any]:
# Mock implementation
return {
"id": customer_id,
"name": "John Doe",
"email": "[email protected]",
"account_type": "premium",
"billing_status": "active"
}

async def get_customer_tickets(self, customer_id: str) -> List[Dict[str, Any]]:


# Mock implementation
return [
{"id": "ticket1", "status": "resolved", "category": "billing", "created_at":
"2023-01-15"}
]

@dataclass
class KnowledgeBase:
async def search(self, query: str, top_k: int = 3) -> List[Dict[str, Any]]:
# Mock implementation
return [
{"id": "kb1", "title": "Billing FAQ", "content": "Information about
billing..."},
{"id": "kb2", "title": "Technical Support Guide", "content": "Troubleshooting
steps..."}
]

@dataclass
class SupportDependencies:
customer_db: CustomerDatabase
knowledge_base: KnowledgeBase
customer_id: str

# Agent
support_agent = Agent(
'openai:gpt-4',
deps_type=SupportDependencies,
output_type=SupportResponse,
system_prompt="""
You are a customer support agent for our company.

Your responsibilities:
1. Respond to customer inquiries in a helpful and professional manner
2. Categorize the inquiry appropriately
3. Determine if the issue needs to be escalated to a human agent
4. Suggest follow-up actions if applicable

Use the available tools to gather information about the customer and relevant
knowledge base articles.
"""
)
@support_agent.tool
async def get_customer_info(ctx: RunContext[SupportDependencies]) -> Dict[str, Any]:
"""
Get information about the customer.

Returns:
Customer information including name, email, account type, and billing status
"""
return await ctx.deps.customer_db.get_customer(ctx.deps.customer_id)

@support_agent.tool
async def get_customer_history(ctx: RunContext[SupportDependencies]) -> List[Dict[str,
Any]]:
"""
Get the customer's support ticket history.

Returns:
List of previous support tickets
"""
return await ctx.deps.customer_db.get_customer_tickets(ctx.deps.customer_id)

@support_agent.tool
async def search_knowledge_base(ctx: RunContext[SupportDependencies], query: str) ->
List[Dict[str, Any]]:
"""
Search the knowledge base for relevant articles.

Args:
query: The search query

Returns:
List of relevant knowledge base articles
"""
return await ctx.deps.knowledge_base.search(query)

# FastAPI app
app = FastAPI(title="Customer Support Agent API")

@app.post("/support", response_model=SupportResponse)
async def handle_support_request(
request: SupportRequest,
authorization: Optional[str] = Header(None)
):
# In a real implementation, validate the authorization token
if not authorization:
raise HTTPException(status_code=401, detail="Unauthorized")

# Initialize services
customer_db = CustomerDatabase()
knowledge_base = KnowledgeBase()

# Create dependencies
deps = SupportDependencies(
customer_db=customer_db,
knowledge_base=knowledge_base,
customer_id=request.customer_id
)

# Run the agent


result = await support_agent.run(request.message, deps=deps)

return result.data

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

Testing and Debugging


Here are some examples of how to test and debug your Pydantic AI agents:

Unit Testing Agents


# tests/test_agents.py
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
from dataclasses import dataclass

from pydantic_ai import Agent, RunContext


from pydantic import BaseModel

# Models for testing


class TestOutput(BaseModel):
result: str
score: float

@dataclass
class TestDeps:
value: str

# Test agent
@pytest.fixture
def test_agent():
agent = Agent(
'openai:gpt-4',
deps_type=TestDeps,
output_type=TestOutput,
system_prompt="You are a test agent."
)

@agent.tool
async def test_tool(ctx: RunContext[TestDeps], input_value: str) -> str:
return f"Processed: {input_value} with {ctx.deps.value}"

return agent

# Tests
@pytest.mark.asyncio
async def test_agent_run(test_agent):
# Mock the LLM call to avoid actual API calls during testing
with patch('pydantic_ai.agent.Agent._call_llm', new_callable=AsyncMock) as mock_call:
# Set up the mock to return a valid response
mock_call.return_value = {
"output": "This is a test response",
"data": {"result": "Success", "score": 0.95}
}

# Run the agent


result = await test_agent.run("Test input", deps=TestDeps(value="test_value"))

# Assertions
assert result.output == "This is a test response"
assert result.data.result == "Success"
assert result.data.score == 0.95

# Verify the LLM was called with the right parameters


mock_call.assert_called_once()
call_args = mock_call.call_args[0]
assert "Test input" in call_args[0] # Check user input
assert "You are a test agent" in call_args[1] # Check system prompt

@pytest.mark.asyncio
async def test_tool_execution(test_agent):
# Test that the tool works correctly
ctx = RunContext(deps=TestDeps(value="test_context"))
result = await test_agent.tools["test_tool"](ctx, "test_input")
assert result == "Processed: test_input with test_context"

Integration Testing
# tests/test_integration.py
import pytest
import asyncio
from fastapi.testclient import TestClient
from unittest.mock import patch, AsyncMock

from main import app, support_agent

# Test client
@pytest.fixture
def client():
return TestClient(app)

# Tests
def test_support_endpoint(client):
# Mock the agent run method to avoid actual LLM calls
with patch('pydantic_ai.agent.Agent.run', new_callable=AsyncMock) as mock_run:
# Set up the mock to return a valid response
mock_run.return_value.data = {
"response": "I can help with your billing issue.",
"category": "billing",
"escalate": False,
"follow_up_actions": ["Check billing status"]
}

# Make the request


response = client.post(
"/support",
json={"customer_id": "customer123", "message": "I have a billing question"},
headers={"Authorization": "Bearer test-token"}
)

# Assertions
assert response.status_code == 200
data = response.json()
assert data["response"] == "I can help with your billing issue."
assert data["category"] == "billing"
assert data["escalate"] is False
assert "Check billing status" in data["follow_up_actions"]

Debugging Tips

1. Enable Verbose Logging:

import logging
logging.basicConfig(level=logging.DEBUG)

# Create a logger for your application


logger = logging.getLogger("pydantic_ai_app")
logger.setLevel(logging.DEBUG)

# Add a file handler


file_handler = logging.FileHandler("app.log")
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

# Use in your agent


@agent.tool
async def debug_tool(ctx: RunContext[Dependencies], input_value: str) -> str:
logger.debug(f"Tool called with input: {input_value}")
result = process_input(input_value)
logger.debug(f"Tool result: {result}")
return result

2. Inspect Agent Execution Graph:

from pydantic_ai import Agent

agent = Agent('openai:gpt-4', system_prompt="Debug agent")

# Enable graph visualization


agent.debug_mode = True

# Run the agent


result = agent.run_sync("Test input")

# The execution graph will be saved to a file that you can visualize
print(f"Execution graph saved to: {agent.last_execution_graph_path}")

Deployment Considerations
Environment Variables
# config/settings.py
import os
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
# API keys
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
anthropic_api_key: str = os.getenv("ANTHROPIC_API_KEY", "")
google_api_key: str = os.getenv("GOOGLE_API_KEY", "")

# Database
database_url: str = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost/db")

# MCP Server
mcp_server_host: str = os.getenv("MCP_SERVER_HOST", "0.0.0.0")
mcp_server_port: int = int(os.getenv("MCP_SERVER_PORT", "8000"))
mcp_api_key: str = os.getenv("MCP_API_KEY", "default-key")

# Logging
log_level: str = os.getenv("LOG_LEVEL", "INFO")

class Config:
env_file = ".env"

# Create settings instance


settings = Settings()

Containerization
# Dockerfile
FROM python:3.10-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code


COPY . .
# Set environment variables
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1

# Run the application


CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Kubernetes Deployment
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pydantic-ai-platform
labels:
app: pydantic-ai-platform
spec:
replicas: 3
selector:
matchLabels:
app: pydantic-ai-platform
template:
metadata:
labels:
app: pydantic-ai-platform
spec:
containers:
- name: pydantic-ai-platform
image: your-registry/pydantic-ai-platform:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai-api-key
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: database-credentials
key: url
resources:
limits:
cpu: "1"
memory: "1Gi"
requests:
cpu: "500m"
memory: "512Mi"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: pydantic-ai-platform
spec:
selector:
app: pydantic-ai-platform
ports:
- port: 80
targetPort: 8000
type: ClusterIP

Appendix: Advanced Patterns


Chained Agents with Dependency Injection
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Any
from pydantic import BaseModel

# Models
class ResearchTopic(BaseModel):
title: str
keywords: List[str]
description: str

class ResearchOutline(BaseModel):
sections: List[Dict[str, Any]]
references: List[str]

class ResearchReport(BaseModel):
title: str
introduction: str
sections: List[Dict[str, str]]
conclusion: str
references: List[str]

# Dependencies
@dataclass
class ResearchDeps:
api_key: str
user_id: str

# Agents
topic_agent = Agent(
'openai:gpt-4',
deps_type=ResearchDeps,
output_type=ResearchTopic,
system_prompt="You help users define research topics."
)

outline_agent = Agent(
'anthropic:claude-3-opus-20240229',
deps_type=ResearchDeps,
output_type=ResearchOutline,
system_prompt="You create detailed outlines for research reports."
)

report_agent = Agent(
'openai:gpt-4',
deps_type=ResearchDeps,
output_type=ResearchReport,
system_prompt="You write comprehensive research reports based on outlines."
)

# Workflow function
async def generate_research_report(query: str, deps: ResearchDeps) -> ResearchReport:
# Step 1: Generate research topic
topic_result = await topic_agent.run(
f"Generate a research topic based on: {query}",
deps=deps
)
topic = topic_result.data

# Step 2: Create outline based on topic


outline_prompt = f"""
Create a detailed outline for a research report on:
Title: {topic.title}
Description: {topic.description}
Keywords: {', '.join(topic.keywords)}
"""

outline_result = await outline_agent.run(outline_prompt, deps=deps)


outline = outline_result.data

# Step 3: Write full report based on outline


report_prompt = f"""
Write a comprehensive research report based on this outline:
Title: {topic.title}

Sections:
{outline.sections}

References to include:
{outline.references}
"""

report_result = await report_agent.run(report_prompt, deps=deps)


return report_result.data

Streaming Responses
from pydantic_ai import Agent
import asyncio

streaming_agent = Agent(
'openai:gpt-4',
system_prompt="You are a storytelling agent that creates engaging narratives.",
streaming=True # Enable streaming
)

# Synchronous streaming
for chunk in streaming_agent.stream_sync("Tell me a short story about a robot learning to
paint."):
print(chunk, end="", flush=True)

# Asynchronous streaming
async def stream_story():
async for chunk in streaming_agent.stream("Tell me a short story about a robot
learning to paint."):
print(chunk, end="", flush=True)
await asyncio.sleep(0.01) # Small delay for demonstration

# Run the async function


asyncio.run(stream_story())

Custom Model Provider


from pydantic_ai.models.base import BaseModel, ModelResponse
from typing import Dict, Any, Optional, List
import aiohttp

class CustomLLMModel(BaseModel):
"""Custom LLM model implementation."""

def __init__(self, model_name: str, api_key: str, api_url: str):


super().__init__(model_name)
self.api_key = api_key
self.api_url = api_url

async def generate(


self,
messages: List[Dict[str, str]],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
stop: Optional[List[str]] = None,
**kwargs
) -> ModelResponse:
"""Generate a response from the custom LLM."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}

payload = {
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stop": stop,
**kwargs
}

async with aiohttp.ClientSession() as session:


async with session.post(self.api_url, headers=headers, json=payload) as
response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"API error: {response.status} - {error_text}")

data = await response.json()

return ModelResponse(
content=data["choices"][0]["message"]["content"],
model=self.model_name,
usage={
"prompt_tokens": data["usage"]["prompt_tokens"],
"completion_tokens": data["usage"]["completion_tokens"],
"total_tokens": data["usage"]["total_tokens"]
},
raw_response=data
)

# Using the custom model


custom_model = CustomLLMModel(
model_name="custom-llm",
api_key="your-api-key",
api_url="https://api.custom-llm-provider.com/v1/chat/completions"
)

agent = Agent(
custom_model,
system_prompt="You are a helpful assistant."
)

response = agent.run_sync("Hello, how are you?")


print(response.output)

This implementation guide provides a comprehensive set of examples and guidelines


for building an AI agent platform using Pydantic AI. By following these examples,
developers can create sophisticated agent-based applications that leverage the power
of large language models while maintaining type safety, modularity, and production-
grade quality.

Common questions

Powered by AI

Conversational agents maintain context by storing recent messages to understand the ongoing dialogue and user preferences. By using the ConversationHistory dataclass, they can access the last few exchanges and adapt responses to align with stored preferences, such as tone or detail level. This historical awareness enables the agent to provide coherent and contextually relevant interactions throughout the conversation, enhancing user experience through personalization and consistent engagement .

The system ensures accurate execution of workflow steps by tracking the status of each step in the workflow from 'PENDING' to 'COMPLETED'. The execute_step tool manages the execution and utilizes structured exception handling to catch errors. If an error occurs, the step's status is updated to 'FAILED', and the error is logged. The workflow orchestration agent can then use information from the failed step to attempt any recovery actions defined within the workflow, allowing for potential retries or adjustments before continuing .

Agents in this framework are created to encapsulate and manage specialized tasks such as data retrieval, conversation management, or workflow execution. Each agent is instantiated with specific dependencies that are injected at runtime, facilitating the separation of concerns and enhancing modularity. For instance, a conversation agent maintains user preferences and conversation history, utilizing these dependencies to provide personalized interactions. Dependency injection allows agents to dynamically interact with services like databases and vector stores, adapting to various operational needs without explicit coupling .

The MCP server ensures secure access by implementing an API key verification mechanism. It uses FastAPI's Depends feature to enforce that a valid API key must be sent in the request headers under 'X-API-Key'. If the key does not match the server-stored API key, an HTTP 401 Unauthorized error is raised, thus preventing unauthorized access .

Docker and Docker Compose are used to containerize the application for ease of deployment and scalability, ensuring consistent environments across different stages of development. The docker-compose.yml file defines services, such as the MCP server, PostgreSQL, and Redis. PostgreSQL is configured with environment variables for the user, password, and database and uses a volume to persist data. Redis is similarly set up with a volume for data persistence. Docker Compose links services together, ensuring they start in a defined order, e.g., the MCP server depends on PostgreSQL and Redis .

Workflow orchestration in the system is managed by a workflow orchestration agent defined with responsibilities to execute and update workflow steps, handle errors, and decide the next steps. Each workflow consists of steps with statuses that change from 'PENDING' to 'IN_PROGRESS' and finally to 'COMPLETED' upon successful execution. The agent updates the workflow context after step execution and uses the advance_workflow tool to move to the next step. Failure handling is built-in to handle exceptions and record errors if a step fails .

A RAG agent processes a user query by following these steps: it first uses the search_knowledge_base tool to retrieve relevant information from a vector database, aiming to return a maximum of five top results. Once the documents are retrieved, the agent synthesizes the information into a comprehensive answer for the user. Throughout the process, it ensures to cite the sources by including document IDs. If the retrieved data is insufficient for a satisfactory response, the agent acknowledges the limitations in available information .

Advanced patterns for implementing agent-based applications using Pydantic AI include the following techniques: utilizing dependency injection to create agent instances that are decoupled from their dependencies, enhancing modularity and testability; employing chained agents that utilize outputs from one agent as inputs for another to create sophisticated, dynamic workflows; and leveraging streaming capabilities to handle real-time data or produce iterative results, thus accommodating use cases like storytelling or interactive applications. These patterns foster scalable, maintainable, and extendable agent solutions .

The support agent categorizes and handles inquiries by accessing the customer database and knowledge base using tools associated with its dependencies. It analyzes the customer's account details and history, including previous tickets. Inquiries are categorized into defined support categories such as billing, technical, account, or other. Based on the categorization, the agent determines if escalation to a human agent is necessary. It also suggests follow-up actions when applicable. This process ensures that each inquiry is handled in a helpful, professional manner, leveraging available data effectively .

Executing a SQL query involves several components: the AppDependencies dataclass which includes the DatabaseService instance, the user ID, and the API key. The query_database tool function within the data_agent is used to execute the query, where security checks are performed to prevent SQL injection by disallowing destructive keywords like 'DROP' or 'DELETE'. Queries are logged for audit purposes with the user ID, and the DatabaseService's query method is called to execute the SQL against a database pool established with asyncpg. The service layer manages the connection pool initialization and closure .

You might also like