Skip to content

High-performance, async-first system that computes and monitors LLM API spend across millions of requests with PostgreSQL and Prometheus.

License

Notifications You must be signed in to change notification settings

kaushal07wick/litellm-aggregator

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

⚡ LiteLLM Spend Aggregator

Built for LiteLLM — a high-performance, async-first system that computes and monitors LLM API spend across millions of requests with PostgreSQL and Prometheus.


🧩 Overview

This system was built for LiteLLM, to solve one of the hardest scaling problems in model orchestration:

How to compute real-time spend across 10M+ LLM requests efficiently — without slowing down inference or requiring distributed compute.

The LiteLLM Spend Aggregator runs continuous, parallelized cost aggregation using PostgreSQL as a compute engine, async I/O for throughput, and Prometheus metrics for live observability.


🧠 Architecture


LiteLLM API Calls
│
▼
┌────────────────────────────┐
│ LiteLLM Cost Logger        │
│ (logs → Postgres)          │
└────────────────────────────┘
│
▼
┌────────────────────────────┐
│ Async Spend Aggregator     │
│ app/aggregator.py          │
│ • Sharded async workers    │
│ • In-DB GROUP BY SUM()     │
│ • Conflict-safe upserts    │
│ • O(1) cleanup via TRUNC   │
└────────────────────────────┘
│
▼
┌────────────────────────────┐
│ FastAPI + Prometheus       │
│ app/main.py                │
│ • /aggregate endpoint      │
│ • /metrics for dashboards  │
└────────────────────────────┘


🧱 Database Schema

-- Raw LiteLLM API logs
CREATE TABLE cost_logs (
    id BIGSERIAL PRIMARY KEY,
    provider TEXT NOT NULL,
    model TEXT NOT NULL,
    cost_usd FLOAT NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

-- Aggregated spend summary
CREATE TABLE spend_summary (
    provider TEXT NOT NULL,
    model TEXT NOT NULL,
    total_cost FLOAT DEFAULT 0,
    updated_at TIMESTAMP DEFAULT NOW(),
    PRIMARY KEY (provider, model)
);

Indexes

CREATE INDEX idx_cost_logs_provider ON cost_logs(provider);
CREATE INDEX idx_cost_logs_provider_model ON cost_logs(provider, model);

⚙️ SQL Optimization Techniques Used

Technique Purpose SQL / Implementation
CTE-based aggregation Perform set-based aggregation directly inside Postgres (faster than Python loops). WITH new_data AS (SELECT provider, model, SUM(cost_usd) AS batch_sum FROM cost_logs GROUP BY provider, model) INSERT INTO spend_summary ...
Conflict-safe upserts Prevent duplicates and enable incremental updates atomically. INSERT INTO spend_summary (...) VALUES (...) ON CONFLICT (provider, model) DO UPDATE SET total_cost = spend_summary.total_cost + EXCLUDED.total_cost;
Index-driven scans Accelerate GROUP BY operations using provider/model indexes. CREATE INDEX idx_cost_logs_provider_model ON cost_logs(provider, model);
Parallel shard aggregation Divide aggregation by provider and run async tasks per shard. Each provider processed via a separate async worker.
O(1) cleanup (TRUNCATE) Reset cost_logs instantly after full aggregation — avoids slow DELETE. TRUNCATE TABLE cost_logs;
In-memory reduce (Python) Combine partial aggregates per shard before writing to DB. merged[(provider, model)] += cost_usd
Async streaming cursor Fetch rows incrementally to avoid loading large datasets into memory. conn.stream(query, params)
Idempotent design Re-running the same aggregation has no side effects. Atomic upsert + delta processing
Batch insertion Send bulk upserts as a single SQL transaction. Executed within one async with engine.begin() block.

🧮 Aggregation Flow

  1. Shard by Provider → Each provider (e.g., OpenAI, Anthropic, Azure) runs in its own async task.
  2. Stream Data → Each worker streams rows from cost_logs using asyncpg cursors.
  3. In-memory Reduce → Local aggregation of (provider, model) → SUM(cost_usd).
  4. Bulk Upsert → One atomic INSERT ... ON CONFLICT DO UPDATE per cycle.
  5. Truncate → Reset table for the next cycle in O(1) time.

🚀 Running It

1️⃣ Install dependencies

pip install -r requirements.txt

2️⃣ Seed synthetic logs

python scripts/seed_cost_logs.py  # Generates ~10M test records

3️⃣ Start the API + Aggregator

uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload

🧩 Endpoints

Endpoint Description
/ Health check
/aggregate Current spend summary
/metrics Prometheus metrics scrape endpoint

Example JSON:

{
  "records": 25,
  "data": [
    {"provider": "openai", "model": "gpt-4o", "total_cost": 10321.72},
    {"provider": "anthropic", "model": "claude-3", "total_cost": 9823.15}
  ]
}

⚡ Performance Results

Metric Result
Input volume 10,000,000 cost_logs
Providers 5
Aggregation time ~134.4 seconds
Rows processed/sec ~540,000
Write throughput ~12 MB/s
CPU utilization ~65% on 8-core CPU
Memory usage < 300MB (streamed reads)

⚡ Benchmarks — Real Aggregation Performance

Tested on: • PostgreSQL 16 (asyncpg driver) • 8-core CPU / 16GB RAM • 10,000,000 LiteLLM cost logs • Parallel async aggregation (8 shards)

Metric Result
Total logs aggregated 10,000,000
Providers processed 5
Models tracked 25
Scan time 132.91 seconds
Total aggregation time 133.73 seconds
Rows processed/sec ~74,800
Cycle completion time 134.85 seconds
CPU usage ~70%
Memory footprint ~250MB

🏁 Console Output Snapshot

image


🧠 Scaling Techniques

Layer Optimization Description
Postgres Set-based aggregation SQL handles the heavy math (SUM, GROUP BY).
Async Python Parallel shards Run per-provider workers concurrently.
Connection pooling asyncpg engine Keeps connections hot between cycles.
Reduced lock contention Upserts per shard Each provider updates separate summary rows.
Truncate vs Delete O(1) cleanup No vacuum overhead.
Prometheus metrics Low-cost introspection Instant visibility without DB queries.

🔭 Future Work

Feature Description
Pub/Sub integration Replace DB ingestion with real-time GCS Pub/Sub events.
BigQuery export Push aggregates to data warehouse for analytics.
Anomaly detection Identify abnormal spend deltas.
Multi-tenant partitioning Cost isolation per API key or organization.

💡 Why This Matters for LiteLLM

“You can’t optimize what you can’t measure.” — Peter Drucker

This aggregation system provides LiteLLM with:

  • Transparent cost visibility across providers and regions
  • Real-time financial telemetry for API consumption
  • Anomaly detection hooks for runaway workloads
  • Infra efficiency — compute cost summaries directly in the DB layer

🧱 Tech Stack

Component Purpose
Python 3.12+ Core runtime
FastAPI REST + lifecycle management
SQLAlchemy (asyncpg) Async ORM layer
PostgreSQL Analytical storage
Prometheus Client Metrics exporter
LiteLLM Source of API usage logs

🤝 Contributing

Contributions are welcome! If you want to improve query performance, add new visualizations, or integrate with additional data backends — open a PR or start a discussion.

Guidelines

  1. Fork the repo and create a feature branch.
  2. Follow the existing async + Postgres design pattern.
  3. Include benchmark results or performance notes.
  4. Submit a pull request — clear, benchmarked changes are preferred.

🪪 License

This project is licensed under the MIT License. You are free to use, modify, and distribute it with proper attribution.

MIT License © 2025 kkc  
Permission is hereby granted, free of charge, to any person obtaining a copy of this software...

About

High-performance, async-first system that computes and monitors LLM API spend across millions of requests with PostgreSQL and Prometheus.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages