This repo was built by me to understand system architecture decisions for high-volume data pipelines with AI-based decision layers.
Aetheris is a distributed anomaly-detection and decision pipeline for high-volume financial events.
The project began as a single subscriber demo and evolved into a multi-stage system with:
- fast edge anomaly detection
- buffered orchestration
- multi-agent consensus decisions
- Streamed
50,000transactions from CSV into MQTT (aetheris-ingestion/publish.js) - Built distributed edge processors (
aetheris-ingestion/subscribe.js) with:- per-account rolling window (
100points) - Z-score anomaly detection
- deterministic account-to-node routing
- per-account rolling window (
- Added a Rust+Wasm edge scoring option (
aetheris-edge-wasm+aetheris-ingestion/subscribe-wasm.js) - Published only anomalies to
aetheris/exceptions - Created a separate Python intelligence service (
aetheris-agents) with:- Redis priority buffering (
immediatevsbatch) - semantic batch inference (single LLM call per role per batch)
- short wait-window batching to collect micro-bursts
- LangGraph orchestration
- 3-agent vote (
Analyst,Auditor,Strategist) - 2/3 consensus output to
aetheris/commands
- Redis priority buffering (
- Added queue and decision metrics logging
- Seeded Mongo transaction history (
49500docs) for tool-based history lookup - Added Mongo decision persistence (
exceptions,commands,tool_audit) and startup healthchecks
- Edge computations are cheap and fast; LLM decisions are slow and expensive.
- So the pipeline filters early and escalates only exceptions.
- Redis sits between fast producers and slow agents to absorb bursts.
- Multi-agent vote reduces single-model decision brittleness.
- Raw pub/sub sanity check
- Published CSV rows to MQTT and subscribed to confirm transport.
- Goal: prove ingest works before adding intelligence.
- Single-node anomaly detector
- Added rolling Z-score logic to subscriber.
- First issue: no anomalies at high threshold because synthetic data range was tight.
- Fix: switched from global baseline to per-account history.
- Initial multi-node attempt
- Ran multiple subscribers to mimic mesh behavior.
- Issue: all nodes did duplicate work (redundancy, not parallelism).
- Workload partitioning iteration
- Tried prefix/range splits, then modulo hash routing.
- Learned: balance and state consistency matter more than "just more nodes".
- Node count reasoning
- Discussed why not 100 nodes: broker fanout, coordination overhead, low marginal gain.
- Settled on a practical small-node setup for this workload.
- Intelligence layer separation
- Moved agents out of ingestion service into separate Python service.
- Reason: clearer ownership, easier scaling, cleaner architecture.
- Prompt-only multi-agent -> real orchestrated flow
- Replaced simple role prompts with structured orchestrator + tool execution pattern.
- Added LangGraph state flow and explicit consensus step.
- Buffering and backpressure control
- Direct processing showed queue lag under sustained anomaly volume.
- Added Redis streams with scoring and priority routing.
- Observability pass
- Added periodic metrics to see queued vs processed and decision mix.
- Reason: needed objective runtime visibility, not just logs.
- True semantic batching
- Reworked agent flow so queues are drained in chunks and decisions are produced in batch.
- Added max-wait windows to accumulate small bursts before inference.
- Result: far fewer LLM API calls than per-transaction inference.
- Persistence and fail-fast operations
- Persisted incoming exceptions and outgoing commands to Mongo for traceability.
- Added tool-call audit logging with TTL cleanup to avoid unbounded growth.
- Added health checks (MQTT, Redis, Mongo) at startup to fail fast on broken dependencies.
- Stateful anomaly detection design: global Z-score looked mathematically fine but behaviorally wrong; per-account baselines were required.
- Work partition correctness: ensuring the same account consistently maps to the same node to preserve rolling state.
- Throughput mismatch: edge layer can emit faster than LLM layer can decide.
- Backpressure and queue growth: even with healthy processing, backlog still grows under sustained input.
- Priority policy tuning: choosing suspicion threshold (
immediatevsbatch) changes cost/latency trade-offs. - Node scaling trade-off: more nodes improved parallelism only up to broker/network overhead limits.
- Batch tuning complexity: semantic batching works, but batch size and wait-window values still need workload-specific tuning.
- Burst handling trade-off: short mini-spikes ("small bursts") can cause too many tiny LLM calls if wait windows are too low, or extra latency if wait windows are too high.
- Explainability vs speed: richer multi-agent reasoning increases latency and operational cost.
- Data realism: synthetic dataset distribution affects anomaly rates and threshold behavior.
- Operational coordination: many moving pieces (EMQX, Redis, Mongo, Node services, Python service) increased runbook complexity.
- Persistence hygiene: storing exceptions/commands/tool audits requires index and retention discipline.
IMMEDIATE_BATCH_SIZE:5(current)BATCH_SIZE:10(current)IMMEDIATE_BATCH_MAX_WAIT_MS:150ms (current)BATCH_MAX_WAIT_MS:300ms (current)- Earlier queue read block behavior before wait-window tuning:
1000ms default read blocking.
MONGODB_TRANSACTIONS_COLLECTION=transactionsMONGODB_EXCEPTIONS_COLLECTION=exceptionsMONGODB_COMMANDS_COLLECTION=commandsMONGODB_TOOL_AUDIT_COLLECTION=tool_auditMONGODB_AUDIT_TTL_DAYS=30FAIL_FAST_HEALTHCHECK=true
Aetheris/
data/ # synthetic finance dataset (50k+ rows)
SRS.md # requirements/spec notes
aetheris-edge-wasm/ # Rust edge scoring module compiled to Wasm
aetheris-ingestion/ # Node.js stream + edge detection
aetheris-agents/ # Python uv LangGraph intelligence layer
- Edge/Ingestion: Node.js, MQTT.js, csv-parser
- Edge compute option: Rust -> WebAssembly (
wasm32-unknown-unknown) - Broker: EMQX
- Intelligence: Python 3.11, uv, LangGraph, langchain-groq
- Buffer/state: Redis Streams
- Historical context tool: MongoDB
Why Wasm:
- It moves the hottest edge math path (Z-score scoring) to compiled Rust for lower CPU overhead.
- It keeps the existing Node MQTT pipeline intact, so migration is incremental and low-risk.
- It gives deterministic compute behavior across runtimes while preserving current topic contracts.
Note: Legacy Node-based agent prototypes were removed from aetheris-ingestion to keep the active integration path single-source through aetheris-agents.
docker compose up -dFrom aetheris-ingestion:
npm install
node subscribe.js -- --node-name=EdgeNode1 --node-count=5
node subscribe.js -- --node-name=EdgeNode2 --node-count=5
node subscribe.js -- --node-name=EdgeNode3 --node-count=5
node subscribe.js -- --node-name=EdgeNode4 --node-count=5
node subscribe.js -- --node-name=EdgeNode5 --node-count=5Build Wasm first (from aetheris-ingestion):
npm run wasm:buildThen run Wasm-backed nodes:
npm run wasm:node1
npm run wasm:node2
npm run wasm:node3
npm run wasm:node4
npm run wasm:node5From aetheris-agents:
uv sync
copy .env.example .env
# set GROQ_API_KEY and GROQ_MODEL
# optional throughput knobs:
# IMMEDIATE_BATCH_SIZE=5
# BATCH_SIZE=10
# IMMEDIATE_BATCH_MAX_WAIT_MS=150
# BATCH_MAX_WAIT_MS=300
# optional persistence knobs:
# MONGODB_EXCEPTIONS_COLLECTION=exceptions
# MONGODB_COMMANDS_COLLECTION=commands
# MONGODB_TOOL_AUDIT_COLLECTION=tool_audit
# MONGODB_AUDIT_TTL_DAYS=30
# FAIL_FAST_HEALTHCHECK=true
uv run python main.pyFor low-cost testing:
GROQ_MODEL=llama-3.1-8b-instant
From aetheris-ingestion:
node publish.js- Throughput is improved by semantic batching, but LLM RPM limits can still dominate under sustained spikes.
- LLM latency dominates throughput.
- Orchestrator is single-process in current form.
- Historical tooling and decision audit path assume Mongo collection readiness.
- Multiple Redis consumer workers
- Retry/backoff + dead-letter handling for 429/network failures
- Rule-based triage before LLM (top-risk-only escalation)
- Prometheus/Grafana instrumentation
- Rust/Wasm edge runtime migration path
This repo shows the full engineering thought process: build fast, measure bottlenecks, and refactor architecture where needed instead of forcing a one-shot design.
Thanks to this dataset for powering the project experiments: https://www.kaggle.com/datasets/testdatabox/finance-fraud-and-loans-dataset-testdatabox