Skip to content

Unified streaming interfaces for LLMs across various API providers. No more manual SSE parsing!

License

Notifications You must be signed in to change notification settings

svilupp/StreamCallbacks.jl

Repository files navigation

StreamCallbacks.jl

Stable Dev Build Status Coverage Aqua

StreamCallbacks.jl is designed to unify streaming interfaces for Large Language Models (LLMs) across multiple providers. It simplifies handling Server-Sent Events (SSE), provides easy debugging by collecting all chunks, and offers various built-in sinks (e.g., stdout, channels, pipes) for streaming data. You can also extend it to implement custom logic for processing streamed data.

Table of Contents

Features

  • Unified Streaming Interface: Provides a consistent API for streaming responses from various LLM providers.
  • Easy Debugging: Collects all received chunks, enabling detailed inspection and debugging.
  • Built-in Sinks: Supports common sinks like stdout, channels, and pipes out of the box.
  • Customizable Callbacks: Extendable interface allows you to define custom behavior for each received chunk.

Supported Providers

  • OpenAI API (and all compatible providers)
    • OpenAIChatStream (alias: OpenAIStream) for Chat Completions API (/v1/chat/completions)
    • OpenAIResponsesStream for Responses API (/v1/responses)
  • Anthropic API (AnthropicStream)
  • Ollama API (OllamaStream for /api/chat endpoint)

When used with PromptingTools.jl, these flavors map to AbstractOpenAISchema and AbstractOpenAIResponsesSchema respectively.

Installation

You can install StreamCallbacks.jl via the package manager:

import Pkg
Pkg.add("StreamCallbacks")

Getting Started

StreamCallbacks.jl revolves around the StreamCallback type, which manages the streaming of messages and the handling of received chunks. Here's a simple example of how to use it:

using StreamCallbacks

# Create a StreamCallback object that streams output to stdout
cb = StreamCallback(out = stdout)

# Use the callback with your API request (see Usage Examples below)

Usage Examples

OpenAI Chat Completions API

using HTTP, JSON3, StreamCallbacks

url = "https://api.openai.com/v1/chat/completions"
headers = [
    "Content-Type" => "application/json",
    "Authorization" => "Bearer $(ENV["OPENAI_API_KEY"])"
]

cb = StreamCallback(out = stdout, flavor = OpenAIChatStream())
messages = [Dict("role" => "user", "content" => "What is 2+2?")]
payload = IOBuffer()
JSON3.write(payload, (; stream = true, messages, model = "gpt-4o-mini", stream_options = (; include_usage = true)))

resp = streamed_request!(cb, url, headers, payload)

OpenAI Responses API

using HTTP, JSON3, StreamCallbacks

url = "https://api.openai.com/v1/responses"
headers = [
    "Content-Type" => "application/json",
    "Authorization" => "Bearer $(ENV["OPENAI_API_KEY"])"
]

cb = StreamCallback(out = stdout, flavor = OpenAIResponsesStream())
payload = IOBuffer()
JSON3.write(payload, (; stream = true, input = "What is 2+2?", model = "gpt-4o-mini"))

resp = streamed_request!(cb, url, headers, payload)

Note: For debugging, set verbose = true in the StreamCallback constructor and enable DEBUG logging level.

Example with PromptingTools.jl

StreamCallbacks.jl is integrated with PromptingTools.jl, allowing you to easily handle streaming in AI generation tasks.

using PromptingTools
const PT = PromptingTools

# Simplest usage: stream output to stdout (the callback is built for you)
msg = aigenerate("Count from 1 to 100."; streamcallback = stdout)

# Create a StreamCallback object to record all chunks
streamcallback = PT.StreamCallback()
msg = aigenerate("Count from 1 to 100."; streamcallback)
# You can inspect each chunk with `streamcallback.chunks`

# Get verbose output with details of each chunk for debugging
streamcallback = PT.StreamCallback(verbose = true, throw_on_error = true)
msg = aigenerate("Count from 1 to 10."; streamcallback)

Note: If you provide a StreamCallback object to aigenerate, PromptingTools.jl will configure it and necessary api_kwargs via configure_callback! unless you specify the flavor field. If you provide a StreamCallback with a specific flavor, you need to provide the correct api_kwargs yourself.

Extending StreamCallbacks.jl

For more complex use cases, you can define your own callback methods. This allows you to customize how each chunk is processed. Here's how the interface works:

StreamCallback Interface

  • Constructor: StreamCallback(; kwargs...) creates a new StreamCallback object.
  • streamed_request!: streamed_request!(cb, url, headers, input) sends a streaming POST request and processes the response using the callback.

The streamed_request! function internally calls:

  • extract_chunks: extract_chunks(flavor, blob) extracts chunks from the received SSE blob.
  • callback: callback(cb, chunk) processes each received chunk.
    • extract_content: extract_content(flavor, chunk) extracts the content from the chunk.
    • print_content: print_content(out, text) prints the content to the output stream.
  • is_done: is_done(flavor, chunk) checks if the streaming is complete.
  • build_response_body: build_response_body(flavor, cb) builds the final response body from the collected chunks.

Custom Callback Example

Suppose you want to process each chunk and send it to a custom sink, such as a logging system or a GUI component. You can extend the print_content method:

using StreamCallbacks

struct MyCustomCallback <: StreamCallbacks.AbstractCallback
    out::IO
    # ... add additional fields if necessary
end

function StreamCallbacks.callback(cb::MyCustomCallback, chunk::StreamChunk; kwargs...)
    # Custom logic to handle the text
    println("Received chunk: ", chunk.data)
    # For example, send the text to a GUI component or log it
end

About

Unified streaming interfaces for LLMs across various API providers. No more manual SSE parsing!

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •