Skip to content

pothos-dev/claude-sdk-gleam

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

claude-sdk-gleam

A Gleam SDK for the Anthropic Messages API with a built-in agentic tool-use loop, leveraging BEAM concurrency for parallel tool execution and fault isolation.

Package Version Hex Docs

What this is

claude_sdk provides a typed Gleam interface to the Anthropic Claude API. Beyond simple message sending, it includes a full agent loop that automatically handles multi-turn tool-use conversations: send a prompt, let the model call tools, execute those tools concurrently on the BEAM, feed results back, and repeat until the model produces a final response.

Key features:

  • Type-safe API for the Anthropic Messages endpoint
  • Automatic agent loop with configurable iteration limits
  • Concurrent tool execution -- each tool call runs in its own BEAM process
  • Event streaming via callbacks or OTP actor message passing
  • SSE streaming support for the Messages API
  • Extended thinking support
  • Builder-pattern configuration

Requirements

  • Gleam >= 1.0
  • Erlang/OTP >= 27 (required by gleam_json v3)
  • An Anthropic API key (set as ANTHROPIC_API_KEY or passed directly)

Installation

Add claude to your Gleam project:

gleam add claude

Quick start

import claude
import gleam/io

const weather_schema = "{
  \"type\": \"object\",
  \"properties\": {
    \"location\": {
      \"type\": \"string\",
      \"description\": \"The city and state, e.g. San Francisco, CA\"
    }
  },
  \"required\": [\"location\"]
}"

pub fn main() {
  // Create a client from the ANTHROPIC_API_KEY environment variable
  let assert Ok(client) = claude.from_env()

  // Define a tool
  let weather_tool =
    claude.tool("get_weather", "Get the current weather for a location", weather_schema)

  // Handle tool calls
  let handler = fn(name, _input) {
    case name {
      "get_weather" ->
        Ok("{\"temperature\": 72, \"condition\": \"sunny\"}")
      _ -> Error("Unknown tool: " <> name)
    }
  }

  // Run the agent loop
  case claude.run(client, "What's the weather in San Francisco?", [weather_tool], handler) {
    Ok(result) -> io.println(claude.result_text(result))
    Error(_err) -> io.println("Agent error")
  }
}

Core concepts

Client configuration

Create a client with an API key directly or from the environment:

// Direct API key
let client = claude.new("sk-ant-...")

// From ANTHROPIC_API_KEY environment variable
let assert Ok(client) = claude.from_env()

The client defaults to:

  • Base URL: https://api.anthropic.com
  • Model: claude-sonnet-4-5-20250929
  • Max tokens: 4096

Override defaults with the builder functions on client.Config:

import claude/client

let client =
  claude.new("sk-ant-...")
  |> client.with_model("claude-opus-4-5-20250929")
  |> client.with_max_tokens(8192)
  |> client.with_base_url("https://my-proxy.example.com")

Defining tools

Tools are defined with a name, description, and a JSON Schema string for the input parameters:

let tool = claude.tool(
  "get_weather",
  "Get the current weather for a location",
  "{\"type\": \"object\", \"properties\": {\"location\": {\"type\": \"string\"}}, \"required\": [\"location\"]}",
)

The input_schema is a raw JSON string that conforms to JSON Schema. The model uses this schema to understand what arguments to provide.

Tool handlers

A tool handler is a function with the signature:

fn(String, String) -> Result(String, String)

The first argument is the tool name, the second is the JSON string of input arguments. Return Ok(json_string) on success or Error(error_message) on failure. The agent loop dispatches all tool calls through a single handler function:

let handler = fn(name, input) {
  case name {
    "get_weather" -> Ok("{\"temperature\": 72}")
    "calculator" -> handle_calculator(input)
    _ -> Error("Unknown tool: " <> name)
  }
}

Running the agent loop

The primary entry point is claude.run:

pub fn run(
  client: client.Config,
  prompt: String,
  tools: List(tool.Tool),
  handler: ToolHandler,
) -> Result(AgentResult, AgentError)

AgentResult contains:

  • final_message -- the model's last Message response
  • messages -- the full conversation history as List(MessageParam)
  • iterations -- how many API round-trips were made
  • total_input_tokens / total_output_tokens -- cumulative token usage

AgentError is one of:

  • ApiCallFailed(ApiError) -- an API request failed
  • MaxIterationsReached(messages, iterations) -- the iteration limit was hit

What happens under the hood

  1. The prompt is sent to the Messages API
  2. The SDK checks the response's stop_reason
  3. If stop_reason is tool_use, it extracts all ToolUse content blocks
  4. Each tool call is executed concurrently in its own BEAM process
  5. Tool results are collected (with per-tool timeout) and assembled into a tool_result message
  6. The results are appended to the conversation history and sent back to the API
  7. Steps 2--6 repeat until the model stops with end_turn, max_tokens, or another non-tool-use reason, or until max_iterations is reached

Advanced usage

Custom AgentConfig with builder pattern

For full control, build an AgentConfig and use claude.run_with_config:

import claude
import claude/agent/config

let assert Ok(client) = claude.from_env()

let tools = [claude.tool("get_weather", "Get weather", schema)]

let cfg =
  config.new(client: client, tools: tools, tool_handler: handler)
  |> config.with_system("You are a helpful weather assistant.")
  |> config.with_model("claude-opus-4-5-20250929")
  |> config.with_max_tokens(2048)
  |> config.with_max_iterations(5)
  |> config.with_thinking(10_000)
  |> config.with_tool_timeout(60_000)
  |> config.with_tool_choice(tool.Auto(disable_parallel: False))

case claude.run_with_config(cfg, "How's the weather in Tokyo?") {
  Ok(result) -> io.println(claude.result_text(result))
  Error(_) -> io.println("Error")
}

Available config builder functions:

Function Default Description
with_system(String) None System prompt
with_model(String) Client default Model ID
with_max_tokens(Int) Client default Max output tokens per API call
with_max_iterations(Int) 10 Maximum agent loop iterations
with_thinking(Int) None Extended thinking token budget
with_tool_timeout(Int) 30_000 Per-tool execution timeout in ms
with_tool_choice(ToolChoice) None (API default) Tool selection strategy

Continuing conversations with agent.run_with_messages

To continue an existing conversation or provide pre-built message history:

import claude/agent
import claude/agent/config
import claude/types/message
import claude/types/content

let cfg = config.new(client: client, tools: tools, tool_handler: handler)

// Build a message history manually
let messages = [
  message.new_user("My name is Alice."),
  message.new_assistant_blocks([content.TextParam(text: "Hello Alice!")]),
  message.new_user("What's my name?"),
]

case agent.run_with_messages(cfg, messages) {
  Ok(result) -> io.println(claude.result_text(result))
  Error(_) -> io.println("Error")
}

Concurrent tool execution

When the model returns multiple tool calls in a single response, the SDK executes them concurrently. Each tool call runs in its own BEAM process via tool_runner.execute_concurrent. Results are collected with a per-tool timeout (default 30 seconds). If a tool exceeds the timeout, it returns Error("Tool execution timed out") for that tool while other results are unaffected.

This happens automatically -- no additional configuration is needed. The concurrency is a natural fit for the BEAM: each tool runs in a lightweight process with its own heap and fault isolation.

Event streaming with actor.run_with_events and actor.start

The claude/agent/actor module provides two ways to observe agent progress in real time.

Synchronous callback-based events:

import claude/agent/actor

actor.run_with_events(cfg, "What's 25 * 4?", fn(event) {
  case event {
    actor.Started(id) -> io.println("Started: " <> id)
    actor.AssistantResponse(_msg) -> io.println("Got response")
    actor.ToolExecuting(name, id) -> io.println("Executing: " <> name)
    actor.ToolCompleted(id, result) -> io.println("Tool done: " <> id)
    actor.Done(result) -> io.println("Agent finished")
    actor.Failed(error) -> io.println("Agent failed")
  }
})

Asynchronous OTP actor pattern:

The actor.start function spawns the agent in a separate BEAM process and sends events to a Subject that the caller can receive from:

import gleam/erlang/process
import claude/agent/actor

let events = process.new_subject()
let _pid = actor.start(cfg, "Hello", events)

// Receive events from the agent's mailbox
case process.receive(events, 30_000) {
  Ok(actor.Done(result)) -> io.println(claude.result_text(result))
  Ok(actor.Failed(err)) -> io.println("Failed")
  _ -> io.println("Timeout or other event")
}

The AgentEvent type covers the full lifecycle:

Event Description
Started(session_id) Agent loop began
AssistantResponse(message) Model returned a response
ToolExecuting(tool_name, tool_id) A tool call is about to run
ToolCompleted(tool_id, result) A tool call finished
Done(result) Agent completed successfully
Failed(error) Agent encountered an error

Direct Messages API access

For one-shot messages without the agent loop:

// Simple helper
case claude.message(client, "What is 2 + 2?") {
  Ok(msg) -> io.println(claude.text_content(msg))
  Error(_) -> io.println("API error")
}

For full control over the Messages API call:

import claude/messages
import claude/types/message
import gleam/option.{None, Some}

case messages.create(
  config: client,
  model: Some("claude-opus-4-5-20250929"),
  max_tokens: Some(1024),
  messages: [message.new_user("Explain monads.")],
  tools: [],
  system: Some("You are a Haskell expert."),
  tool_choice: None,
  thinking: None,
) {
  Ok(msg) -> // msg is a Message
  Error(api_error) -> // api_error is an ApiError
}

Streaming SSE events

The SDK supports streaming responses via messages.create_stream. Note that gleam_httpc buffers the full response, so events are parsed from the complete SSE payload after the request completes:

import claude/messages
import claude/streaming.{TextDelta, ContentBlockDelta, MessageStop}
import gleam/option.{None}

case messages.create_stream(
  config: client,
  model: None,
  max_tokens: None,
  messages: [message.new_user("Tell me a story.")],
  tools: [],
  system: None,
  tool_choice: None,
  thinking: None,
) {
  Ok(events) ->
    list.each(events, fn(event) {
      case event {
        ContentBlockDelta(index: _, delta: TextDelta(text)) ->
          io.print(text)
        MessageStop -> io.println("\n[done]")
        _ -> Nil
      }
    })
  Error(_) -> io.println("Stream error")
}

StreamEvent variants: MessageStart, ContentBlockStart, ContentBlockDelta, ContentBlockStop, MessageDelta, MessageStop, Ping, UnknownEvent.

API reference

claude (top-level)

Function Description
new(api_key) -> Config Create client with API key
from_env() -> Result(Config, EnvError) Create client from ANTHROPIC_API_KEY
tool(name, description, input_schema) -> Tool Define a tool
run(client, prompt, tools, handler) -> Result(AgentResult, AgentError) Run the agent loop
run_with_config(config, prompt) -> Result(AgentResult, AgentError) Run with custom config
message(client, prompt) -> Result(Message, ApiError) Send a single message
text_content(message) -> String Extract text from a Message
result_text(result) -> String Extract text from an AgentResult
version() -> String SDK version string

claude/client

Function Description
new(api_key) -> Config Create client config
with_model(config, model) -> Config Set default model
with_base_url(config, url) -> Config Set API base URL
with_max_tokens(config, n) -> Config Set default max tokens

claude/agent

Function Description
run(config, prompt) -> Result(AgentResult, AgentError) Run agent loop
run_with_messages(config, messages) -> Result(AgentResult, AgentError) Run with existing history
extract_tool_calls(content) -> List(ContentBlock) Filter tool-use blocks
build_tool_results_message(results) -> MessageParam Build tool results message

claude/agent/config

Function Description
new(client, tools, tool_handler) -> AgentConfig Create agent config
with_system(config, system) -> AgentConfig Set system prompt
with_model(config, model) -> AgentConfig Set model
with_max_tokens(config, n) -> AgentConfig Set max tokens
with_max_iterations(config, n) -> AgentConfig Set iteration limit
with_thinking(config, budget) -> AgentConfig Enable extended thinking
with_tool_timeout(config, ms) -> AgentConfig Set tool timeout
with_tool_choice(config, choice) -> AgentConfig Set tool choice strategy

claude/agent/actor

Function Description
run_with_events(config, prompt, on_event) -> Result(AgentResult, AgentError) Synchronous with event callbacks
start(config, prompt, caller) -> Pid Async agent in a new BEAM process

claude/agent/tool_runner

Function Description
execute_concurrent(tool_calls, handler, timeout_ms) -> List(ToolResult) Run tools concurrently

claude/messages

Function Description
create(...) -> Result(Message, ApiError) Send a Messages API request
create_simple(config, message) -> Result(Message, ApiError) Simple one-shot message
create_stream(...) -> Result(List(StreamEvent), ApiError) Streaming Messages API request
build_request(...) -> Request(String) Build HTTP request without sending
build_stream_request(...) -> Request(String) Build streaming HTTP request

claude/streaming

Function Description
parse_sse(text) -> List(StreamEvent) Parse SSE payload into events
parse_event(event_type, data) -> StreamEvent Parse a single SSE event

claude/types/tool

Type Description
Tool(name, description, input_schema) Tool definition
ToolChoice Auto, Any, SpecificTool(name), NoTools -- each with disable_parallel flag

claude/types/message

Type Description
Message Full API response message with content, usage, stop_reason
MessageParam Message sent to the API (role + content)
StopReason EndTurn, ToolUseStop, MaxTokens, StopSequence, PauseTurn, Refusal
Usage Token counts (input, output, cache creation, cache read)

claude/types/content

Type Description
ContentBlock Response content: Text, Thinking, ToolUse, ServerToolUse, WebSearchResult
ContentBlockParam Request content: TextParam, ImageParam, DocumentParam, ToolUseParam, ToolResultParam

claude/types/error

Type Description
ApiError AuthenticationError, RateLimitError, BadRequestError, NotFoundError, ServerError, ConnectionError, TimeoutError, UnknownError

Architecture

claude.gleam                    -- Top-level public API (facade)
claude/
  client.gleam                  -- Client config (API key, model, base URL)
  messages.gleam                -- HTTP layer: build and send Messages API requests
  streaming.gleam               -- SSE parser for streaming responses
  agent.gleam                   -- Core agent loop (recursive, synchronous)
  agent/
    config.gleam                -- AgentConfig type and builder functions
    tool_runner.gleam           -- Concurrent tool execution via BEAM processes
    actor.gleam                 -- Event-emitting agent loop + async OTP actor
  types/
    content.gleam               -- ContentBlock and ContentBlockParam types
    message.gleam               -- Message, MessageParam, Role, StopReason, Usage
    tool.gleam                  -- Tool and ToolChoice types
    error.gleam                 -- ApiError type and HTTP status mapping
  json/
    encode.gleam                -- JSON encoding for API request bodies
    decode.gleam                -- JSON decoding for API response bodies

The architecture takes advantage of the BEAM in three ways:

  1. Concurrent tool execution -- When the model returns N tool calls, tool_runner spawns N lightweight BEAM processes that run in parallel. Results are collected with per-tool timeouts using process.receive. This is significantly faster than sequential execution for I/O-bound tools.

  2. Fault isolation -- Each tool runs in its own process with its own heap. If a tool handler crashes or times out, it does not affect the agent loop or other tool executions. The agent simply receives a timeout error for that specific tool.

  3. Actor-based event streaming -- actor.start spawns the entire agent loop in a new BEAM process and communicates results via Subject message passing, fitting naturally into OTP supervision trees and concurrent application architectures.

Development

Build the project:

gleam build

Run tests:

gleam test

Format code:

gleam format

Run an example (add the example file to src/ first):

ANTHROPIC_API_KEY=sk-ant-... gleam run -m weather_agent

License

MIT

About

Gleam SDK for the Anthropic Claude API with agentic tool-use loop and BEAM concurrency

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages