A Gleam SDK for the Anthropic Messages API with a built-in agentic tool-use loop, leveraging BEAM concurrency for parallel tool execution and fault isolation.
claude_sdk provides a typed Gleam interface to the Anthropic Claude API. Beyond simple message sending, it includes a full agent loop that automatically handles multi-turn tool-use conversations: send a prompt, let the model call tools, execute those tools concurrently on the BEAM, feed results back, and repeat until the model produces a final response.
Key features:
- Type-safe API for the Anthropic Messages endpoint
- Automatic agent loop with configurable iteration limits
- Concurrent tool execution -- each tool call runs in its own BEAM process
- Event streaming via callbacks or OTP actor message passing
- SSE streaming support for the Messages API
- Extended thinking support
- Builder-pattern configuration
- Gleam >= 1.0
- Erlang/OTP >= 27 (required by
gleam_jsonv3) - An Anthropic API key (set as
ANTHROPIC_API_KEYor passed directly)
Add claude to your Gleam project:
gleam add claudeimport claude
import gleam/io
const weather_schema = "{
\"type\": \"object\",
\"properties\": {
\"location\": {
\"type\": \"string\",
\"description\": \"The city and state, e.g. San Francisco, CA\"
}
},
\"required\": [\"location\"]
}"
pub fn main() {
// Create a client from the ANTHROPIC_API_KEY environment variable
let assert Ok(client) = claude.from_env()
// Define a tool
let weather_tool =
claude.tool("get_weather", "Get the current weather for a location", weather_schema)
// Handle tool calls
let handler = fn(name, _input) {
case name {
"get_weather" ->
Ok("{\"temperature\": 72, \"condition\": \"sunny\"}")
_ -> Error("Unknown tool: " <> name)
}
}
// Run the agent loop
case claude.run(client, "What's the weather in San Francisco?", [weather_tool], handler) {
Ok(result) -> io.println(claude.result_text(result))
Error(_err) -> io.println("Agent error")
}
}Create a client with an API key directly or from the environment:
// Direct API key
let client = claude.new("sk-ant-...")
// From ANTHROPIC_API_KEY environment variable
let assert Ok(client) = claude.from_env()The client defaults to:
- Base URL:
https://api.anthropic.com - Model:
claude-sonnet-4-5-20250929 - Max tokens:
4096
Override defaults with the builder functions on client.Config:
import claude/client
let client =
claude.new("sk-ant-...")
|> client.with_model("claude-opus-4-5-20250929")
|> client.with_max_tokens(8192)
|> client.with_base_url("https://my-proxy.example.com")Tools are defined with a name, description, and a JSON Schema string for the input parameters:
let tool = claude.tool(
"get_weather",
"Get the current weather for a location",
"{\"type\": \"object\", \"properties\": {\"location\": {\"type\": \"string\"}}, \"required\": [\"location\"]}",
)The input_schema is a raw JSON string that conforms to JSON Schema. The model uses this schema to understand what arguments to provide.
A tool handler is a function with the signature:
fn(String, String) -> Result(String, String)The first argument is the tool name, the second is the JSON string of input arguments. Return Ok(json_string) on success or Error(error_message) on failure. The agent loop dispatches all tool calls through a single handler function:
let handler = fn(name, input) {
case name {
"get_weather" -> Ok("{\"temperature\": 72}")
"calculator" -> handle_calculator(input)
_ -> Error("Unknown tool: " <> name)
}
}The primary entry point is claude.run:
pub fn run(
client: client.Config,
prompt: String,
tools: List(tool.Tool),
handler: ToolHandler,
) -> Result(AgentResult, AgentError)AgentResult contains:
final_message-- the model's lastMessageresponsemessages-- the full conversation history asList(MessageParam)iterations-- how many API round-trips were madetotal_input_tokens/total_output_tokens-- cumulative token usage
AgentError is one of:
ApiCallFailed(ApiError)-- an API request failedMaxIterationsReached(messages, iterations)-- the iteration limit was hit
- The prompt is sent to the Messages API
- The SDK checks the response's
stop_reason - If
stop_reasonistool_use, it extracts allToolUsecontent blocks - Each tool call is executed concurrently in its own BEAM process
- Tool results are collected (with per-tool timeout) and assembled into a
tool_resultmessage - The results are appended to the conversation history and sent back to the API
- Steps 2--6 repeat until the model stops with
end_turn,max_tokens, or another non-tool-use reason, or untilmax_iterationsis reached
For full control, build an AgentConfig and use claude.run_with_config:
import claude
import claude/agent/config
let assert Ok(client) = claude.from_env()
let tools = [claude.tool("get_weather", "Get weather", schema)]
let cfg =
config.new(client: client, tools: tools, tool_handler: handler)
|> config.with_system("You are a helpful weather assistant.")
|> config.with_model("claude-opus-4-5-20250929")
|> config.with_max_tokens(2048)
|> config.with_max_iterations(5)
|> config.with_thinking(10_000)
|> config.with_tool_timeout(60_000)
|> config.with_tool_choice(tool.Auto(disable_parallel: False))
case claude.run_with_config(cfg, "How's the weather in Tokyo?") {
Ok(result) -> io.println(claude.result_text(result))
Error(_) -> io.println("Error")
}Available config builder functions:
| Function | Default | Description |
|---|---|---|
with_system(String) |
None |
System prompt |
with_model(String) |
Client default | Model ID |
with_max_tokens(Int) |
Client default | Max output tokens per API call |
with_max_iterations(Int) |
10 |
Maximum agent loop iterations |
with_thinking(Int) |
None |
Extended thinking token budget |
with_tool_timeout(Int) |
30_000 |
Per-tool execution timeout in ms |
with_tool_choice(ToolChoice) |
None (API default) |
Tool selection strategy |
To continue an existing conversation or provide pre-built message history:
import claude/agent
import claude/agent/config
import claude/types/message
import claude/types/content
let cfg = config.new(client: client, tools: tools, tool_handler: handler)
// Build a message history manually
let messages = [
message.new_user("My name is Alice."),
message.new_assistant_blocks([content.TextParam(text: "Hello Alice!")]),
message.new_user("What's my name?"),
]
case agent.run_with_messages(cfg, messages) {
Ok(result) -> io.println(claude.result_text(result))
Error(_) -> io.println("Error")
}When the model returns multiple tool calls in a single response, the SDK executes them concurrently. Each tool call runs in its own BEAM process via tool_runner.execute_concurrent. Results are collected with a per-tool timeout (default 30 seconds). If a tool exceeds the timeout, it returns Error("Tool execution timed out") for that tool while other results are unaffected.
This happens automatically -- no additional configuration is needed. The concurrency is a natural fit for the BEAM: each tool runs in a lightweight process with its own heap and fault isolation.
The claude/agent/actor module provides two ways to observe agent progress in real time.
Synchronous callback-based events:
import claude/agent/actor
actor.run_with_events(cfg, "What's 25 * 4?", fn(event) {
case event {
actor.Started(id) -> io.println("Started: " <> id)
actor.AssistantResponse(_msg) -> io.println("Got response")
actor.ToolExecuting(name, id) -> io.println("Executing: " <> name)
actor.ToolCompleted(id, result) -> io.println("Tool done: " <> id)
actor.Done(result) -> io.println("Agent finished")
actor.Failed(error) -> io.println("Agent failed")
}
})Asynchronous OTP actor pattern:
The actor.start function spawns the agent in a separate BEAM process and sends events to a Subject that the caller can receive from:
import gleam/erlang/process
import claude/agent/actor
let events = process.new_subject()
let _pid = actor.start(cfg, "Hello", events)
// Receive events from the agent's mailbox
case process.receive(events, 30_000) {
Ok(actor.Done(result)) -> io.println(claude.result_text(result))
Ok(actor.Failed(err)) -> io.println("Failed")
_ -> io.println("Timeout or other event")
}The AgentEvent type covers the full lifecycle:
| Event | Description |
|---|---|
Started(session_id) |
Agent loop began |
AssistantResponse(message) |
Model returned a response |
ToolExecuting(tool_name, tool_id) |
A tool call is about to run |
ToolCompleted(tool_id, result) |
A tool call finished |
Done(result) |
Agent completed successfully |
Failed(error) |
Agent encountered an error |
For one-shot messages without the agent loop:
// Simple helper
case claude.message(client, "What is 2 + 2?") {
Ok(msg) -> io.println(claude.text_content(msg))
Error(_) -> io.println("API error")
}For full control over the Messages API call:
import claude/messages
import claude/types/message
import gleam/option.{None, Some}
case messages.create(
config: client,
model: Some("claude-opus-4-5-20250929"),
max_tokens: Some(1024),
messages: [message.new_user("Explain monads.")],
tools: [],
system: Some("You are a Haskell expert."),
tool_choice: None,
thinking: None,
) {
Ok(msg) -> // msg is a Message
Error(api_error) -> // api_error is an ApiError
}The SDK supports streaming responses via messages.create_stream. Note that gleam_httpc buffers the full response, so events are parsed from the complete SSE payload after the request completes:
import claude/messages
import claude/streaming.{TextDelta, ContentBlockDelta, MessageStop}
import gleam/option.{None}
case messages.create_stream(
config: client,
model: None,
max_tokens: None,
messages: [message.new_user("Tell me a story.")],
tools: [],
system: None,
tool_choice: None,
thinking: None,
) {
Ok(events) ->
list.each(events, fn(event) {
case event {
ContentBlockDelta(index: _, delta: TextDelta(text)) ->
io.print(text)
MessageStop -> io.println("\n[done]")
_ -> Nil
}
})
Error(_) -> io.println("Stream error")
}StreamEvent variants: MessageStart, ContentBlockStart, ContentBlockDelta, ContentBlockStop, MessageDelta, MessageStop, Ping, UnknownEvent.
| Function | Description |
|---|---|
new(api_key) -> Config |
Create client with API key |
from_env() -> Result(Config, EnvError) |
Create client from ANTHROPIC_API_KEY |
tool(name, description, input_schema) -> Tool |
Define a tool |
run(client, prompt, tools, handler) -> Result(AgentResult, AgentError) |
Run the agent loop |
run_with_config(config, prompt) -> Result(AgentResult, AgentError) |
Run with custom config |
message(client, prompt) -> Result(Message, ApiError) |
Send a single message |
text_content(message) -> String |
Extract text from a Message |
result_text(result) -> String |
Extract text from an AgentResult |
version() -> String |
SDK version string |
| Function | Description |
|---|---|
new(api_key) -> Config |
Create client config |
with_model(config, model) -> Config |
Set default model |
with_base_url(config, url) -> Config |
Set API base URL |
with_max_tokens(config, n) -> Config |
Set default max tokens |
| Function | Description |
|---|---|
run(config, prompt) -> Result(AgentResult, AgentError) |
Run agent loop |
run_with_messages(config, messages) -> Result(AgentResult, AgentError) |
Run with existing history |
extract_tool_calls(content) -> List(ContentBlock) |
Filter tool-use blocks |
build_tool_results_message(results) -> MessageParam |
Build tool results message |
| Function | Description |
|---|---|
new(client, tools, tool_handler) -> AgentConfig |
Create agent config |
with_system(config, system) -> AgentConfig |
Set system prompt |
with_model(config, model) -> AgentConfig |
Set model |
with_max_tokens(config, n) -> AgentConfig |
Set max tokens |
with_max_iterations(config, n) -> AgentConfig |
Set iteration limit |
with_thinking(config, budget) -> AgentConfig |
Enable extended thinking |
with_tool_timeout(config, ms) -> AgentConfig |
Set tool timeout |
with_tool_choice(config, choice) -> AgentConfig |
Set tool choice strategy |
| Function | Description |
|---|---|
run_with_events(config, prompt, on_event) -> Result(AgentResult, AgentError) |
Synchronous with event callbacks |
start(config, prompt, caller) -> Pid |
Async agent in a new BEAM process |
| Function | Description |
|---|---|
execute_concurrent(tool_calls, handler, timeout_ms) -> List(ToolResult) |
Run tools concurrently |
| Function | Description |
|---|---|
create(...) -> Result(Message, ApiError) |
Send a Messages API request |
create_simple(config, message) -> Result(Message, ApiError) |
Simple one-shot message |
create_stream(...) -> Result(List(StreamEvent), ApiError) |
Streaming Messages API request |
build_request(...) -> Request(String) |
Build HTTP request without sending |
build_stream_request(...) -> Request(String) |
Build streaming HTTP request |
| Function | Description |
|---|---|
parse_sse(text) -> List(StreamEvent) |
Parse SSE payload into events |
parse_event(event_type, data) -> StreamEvent |
Parse a single SSE event |
| Type | Description |
|---|---|
Tool(name, description, input_schema) |
Tool definition |
ToolChoice |
Auto, Any, SpecificTool(name), NoTools -- each with disable_parallel flag |
| Type | Description |
|---|---|
Message |
Full API response message with content, usage, stop_reason |
MessageParam |
Message sent to the API (role + content) |
StopReason |
EndTurn, ToolUseStop, MaxTokens, StopSequence, PauseTurn, Refusal |
Usage |
Token counts (input, output, cache creation, cache read) |
| Type | Description |
|---|---|
ContentBlock |
Response content: Text, Thinking, ToolUse, ServerToolUse, WebSearchResult |
ContentBlockParam |
Request content: TextParam, ImageParam, DocumentParam, ToolUseParam, ToolResultParam |
| Type | Description |
|---|---|
ApiError |
AuthenticationError, RateLimitError, BadRequestError, NotFoundError, ServerError, ConnectionError, TimeoutError, UnknownError |
claude.gleam -- Top-level public API (facade)
claude/
client.gleam -- Client config (API key, model, base URL)
messages.gleam -- HTTP layer: build and send Messages API requests
streaming.gleam -- SSE parser for streaming responses
agent.gleam -- Core agent loop (recursive, synchronous)
agent/
config.gleam -- AgentConfig type and builder functions
tool_runner.gleam -- Concurrent tool execution via BEAM processes
actor.gleam -- Event-emitting agent loop + async OTP actor
types/
content.gleam -- ContentBlock and ContentBlockParam types
message.gleam -- Message, MessageParam, Role, StopReason, Usage
tool.gleam -- Tool and ToolChoice types
error.gleam -- ApiError type and HTTP status mapping
json/
encode.gleam -- JSON encoding for API request bodies
decode.gleam -- JSON decoding for API response bodies
The architecture takes advantage of the BEAM in three ways:
-
Concurrent tool execution -- When the model returns N tool calls,
tool_runnerspawns N lightweight BEAM processes that run in parallel. Results are collected with per-tool timeouts usingprocess.receive. This is significantly faster than sequential execution for I/O-bound tools. -
Fault isolation -- Each tool runs in its own process with its own heap. If a tool handler crashes or times out, it does not affect the agent loop or other tool executions. The agent simply receives a timeout error for that specific tool.
-
Actor-based event streaming --
actor.startspawns the entire agent loop in a new BEAM process and communicates results viaSubjectmessage passing, fitting naturally into OTP supervision trees and concurrent application architectures.
Build the project:
gleam buildRun tests:
gleam testFormat code:
gleam formatRun an example (add the example file to src/ first):
ANTHROPIC_API_KEY=sk-ant-... gleam run -m weather_agentMIT