StreamCallbacks.jl is designed to unify streaming interfaces for Large Language Models (LLMs) across multiple providers. It simplifies handling Server-Sent Events (SSE), provides easy debugging by collecting all chunks, and offers various built-in sinks (e.g., stdout, channels, pipes) for streaming data. You can also extend it to implement custom logic for processing streamed data.
- Unified Streaming Interface: Provides a consistent API for streaming responses from various LLM providers.
- Easy Debugging: Collects all received chunks, enabling detailed inspection and debugging.
- Built-in Sinks: Supports common sinks like
stdout, channels, and pipes out of the box. - Customizable Callbacks: Extendable interface allows you to define custom behavior for each received chunk.
- OpenAI API (and all compatible providers)
OpenAIChatStream(alias:OpenAIStream) for Chat Completions API (/v1/chat/completions)OpenAIResponsesStreamfor Responses API (/v1/responses)
- Anthropic API (
AnthropicStream) - Ollama API (
OllamaStreamfor/api/chatendpoint)
When used with PromptingTools.jl, these flavors map to AbstractOpenAISchema and AbstractOpenAIResponsesSchema respectively.
You can install StreamCallbacks.jl via the package manager:
import Pkg
Pkg.add("StreamCallbacks")StreamCallbacks.jl revolves around the StreamCallback type, which manages the streaming of messages and the handling of received chunks. Here's a simple example of how to use it:
using StreamCallbacks
# Create a StreamCallback object that streams output to stdout
cb = StreamCallback(out = stdout)
# Use the callback with your API request (see Usage Examples below)using HTTP, JSON3, StreamCallbacks
url = "https://api.openai.com/v1/chat/completions"
headers = [
"Content-Type" => "application/json",
"Authorization" => "Bearer $(ENV["OPENAI_API_KEY"])"
]
cb = StreamCallback(out = stdout, flavor = OpenAIChatStream())
messages = [Dict("role" => "user", "content" => "What is 2+2?")]
payload = IOBuffer()
JSON3.write(payload, (; stream = true, messages, model = "gpt-4o-mini", stream_options = (; include_usage = true)))
resp = streamed_request!(cb, url, headers, payload)using HTTP, JSON3, StreamCallbacks
url = "https://api.openai.com/v1/responses"
headers = [
"Content-Type" => "application/json",
"Authorization" => "Bearer $(ENV["OPENAI_API_KEY"])"
]
cb = StreamCallback(out = stdout, flavor = OpenAIResponsesStream())
payload = IOBuffer()
JSON3.write(payload, (; stream = true, input = "What is 2+2?", model = "gpt-4o-mini"))
resp = streamed_request!(cb, url, headers, payload)Note: For debugging, set verbose = true in the StreamCallback constructor and enable DEBUG logging level.
StreamCallbacks.jl is integrated with PromptingTools.jl, allowing you to easily handle streaming in AI generation tasks.
using PromptingTools
const PT = PromptingTools
# Simplest usage: stream output to stdout (the callback is built for you)
msg = aigenerate("Count from 1 to 100."; streamcallback = stdout)
# Create a StreamCallback object to record all chunks
streamcallback = PT.StreamCallback()
msg = aigenerate("Count from 1 to 100."; streamcallback)
# You can inspect each chunk with `streamcallback.chunks`
# Get verbose output with details of each chunk for debugging
streamcallback = PT.StreamCallback(verbose = true, throw_on_error = true)
msg = aigenerate("Count from 1 to 10."; streamcallback)Note: If you provide a StreamCallback object to aigenerate, PromptingTools.jl will configure it and necessary api_kwargs via configure_callback! unless you specify the flavor field. If you provide a StreamCallback with a specific flavor, you need to provide the correct api_kwargs yourself.
For more complex use cases, you can define your own callback methods. This allows you to customize how each chunk is processed. Here's how the interface works:
- Constructor:
StreamCallback(; kwargs...)creates a newStreamCallbackobject. streamed_request!:streamed_request!(cb, url, headers, input)sends a streaming POST request and processes the response using the callback.
The streamed_request! function internally calls:
extract_chunks:extract_chunks(flavor, blob)extracts chunks from the received SSE blob.callback:callback(cb, chunk)processes each received chunk.extract_content:extract_content(flavor, chunk)extracts the content from the chunk.print_content:print_content(out, text)prints the content to the output stream.
is_done:is_done(flavor, chunk)checks if the streaming is complete.build_response_body:build_response_body(flavor, cb)builds the final response body from the collected chunks.
Suppose you want to process each chunk and send it to a custom sink, such as a logging system or a GUI component. You can extend the print_content method:
using StreamCallbacks
struct MyCustomCallback <: StreamCallbacks.AbstractCallback
out::IO
# ... add additional fields if necessary
end
function StreamCallbacks.callback(cb::MyCustomCallback, chunk::StreamChunk; kwargs...)
# Custom logic to handle the text
println("Received chunk: ", chunk.data)
# For example, send the text to a GUI component or log it
end