Skip to content

TejasS1233/Aetheris

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Aetheris

This repo was built by me to understand system architecture decisions for high-volume data pipelines with AI-based decision layers.

Aetheris is a distributed anomaly-detection and decision pipeline for high-volume financial events.

The project began as a single subscriber demo and evolved into a multi-stage system with:

  • fast edge anomaly detection
  • buffered orchestration
  • multi-agent consensus decisions

What I actually built

  • Streamed 50,000 transactions from CSV into MQTT (aetheris-ingestion/publish.js)
  • Built distributed edge processors (aetheris-ingestion/subscribe.js) with:
    • per-account rolling window (100 points)
    • Z-score anomaly detection
    • deterministic account-to-node routing
  • Added a Rust+Wasm edge scoring option (aetheris-edge-wasm + aetheris-ingestion/subscribe-wasm.js)
  • Published only anomalies to aetheris/exceptions
  • Created a separate Python intelligence service (aetheris-agents) with:
    • Redis priority buffering (immediate vs batch)
    • semantic batch inference (single LLM call per role per batch)
    • short wait-window batching to collect micro-bursts
    • LangGraph orchestration
    • 3-agent vote (Analyst, Auditor, Strategist)
    • 2/3 consensus output to aetheris/commands
  • Added queue and decision metrics logging
  • Seeded Mongo transaction history (49500 docs) for tool-based history lookup
  • Added Mongo decision persistence (exceptions, commands, tool_audit) and startup healthchecks

Why this architecture

  • Edge computations are cheap and fast; LLM decisions are slow and expensive.
  • So the pipeline filters early and escalates only exceptions.
  • Redis sits between fast producers and slow agents to absorb bursts.
  • Multi-agent vote reduces single-model decision brittleness.

Build timeline (what changed, why it changed)

  1. Raw pub/sub sanity check
  • Published CSV rows to MQTT and subscribed to confirm transport.
  • Goal: prove ingest works before adding intelligence.
  1. Single-node anomaly detector
  • Added rolling Z-score logic to subscriber.
  • First issue: no anomalies at high threshold because synthetic data range was tight.
  • Fix: switched from global baseline to per-account history.
  1. Initial multi-node attempt
  • Ran multiple subscribers to mimic mesh behavior.
  • Issue: all nodes did duplicate work (redundancy, not parallelism).
  1. Workload partitioning iteration
  • Tried prefix/range splits, then modulo hash routing.
  • Learned: balance and state consistency matter more than "just more nodes".
  1. Node count reasoning
  • Discussed why not 100 nodes: broker fanout, coordination overhead, low marginal gain.
  • Settled on a practical small-node setup for this workload.
  1. Intelligence layer separation
  • Moved agents out of ingestion service into separate Python service.
  • Reason: clearer ownership, easier scaling, cleaner architecture.
  1. Prompt-only multi-agent -> real orchestrated flow
  • Replaced simple role prompts with structured orchestrator + tool execution pattern.
  • Added LangGraph state flow and explicit consensus step.
  1. Buffering and backpressure control
  • Direct processing showed queue lag under sustained anomaly volume.
  • Added Redis streams with scoring and priority routing.
  1. Observability pass
  • Added periodic metrics to see queued vs processed and decision mix.
  • Reason: needed objective runtime visibility, not just logs.
  1. True semantic batching
  • Reworked agent flow so queues are drained in chunks and decisions are produced in batch.
  • Added max-wait windows to accumulate small bursts before inference.
  • Result: far fewer LLM API calls than per-transaction inference.
  1. Persistence and fail-fast operations
  • Persisted incoming exceptions and outgoing commands to Mongo for traceability.
  • Added tool-call audit logging with TTL cleanup to avoid unbounded growth.
  • Added health checks (MQTT, Redis, Mongo) at startup to fail fast on broken dependencies.

Real problems faced

  • Stateful anomaly detection design: global Z-score looked mathematically fine but behaviorally wrong; per-account baselines were required.
  • Work partition correctness: ensuring the same account consistently maps to the same node to preserve rolling state.
  • Throughput mismatch: edge layer can emit faster than LLM layer can decide.
  • Backpressure and queue growth: even with healthy processing, backlog still grows under sustained input.
  • Priority policy tuning: choosing suspicion threshold (immediate vs batch) changes cost/latency trade-offs.
  • Node scaling trade-off: more nodes improved parallelism only up to broker/network overhead limits.
  • Batch tuning complexity: semantic batching works, but batch size and wait-window values still need workload-specific tuning.
  • Burst handling trade-off: short mini-spikes ("small bursts") can cause too many tiny LLM calls if wait windows are too low, or extra latency if wait windows are too high.
  • Explainability vs speed: richer multi-agent reasoning increases latency and operational cost.
  • Data realism: synthetic dataset distribution affects anomaly rates and threshold behavior.
  • Operational coordination: many moving pieces (EMQX, Redis, Mongo, Node services, Python service) increased runbook complexity.
  • Persistence hygiene: storing exceptions/commands/tool audits requires index and retention discipline.

Current batching values

  • IMMEDIATE_BATCH_SIZE: 5 (current)
  • BATCH_SIZE: 10 (current)
  • IMMEDIATE_BATCH_MAX_WAIT_MS: 150 ms (current)
  • BATCH_MAX_WAIT_MS: 300 ms (current)
  • Earlier queue read block behavior before wait-window tuning: 1000 ms default read blocking.

Current persistence values (Mongo)

  • MONGODB_TRANSACTIONS_COLLECTION=transactions
  • MONGODB_EXCEPTIONS_COLLECTION=exceptions
  • MONGODB_COMMANDS_COLLECTION=commands
  • MONGODB_TOOL_AUDIT_COLLECTION=tool_audit
  • MONGODB_AUDIT_TTL_DAYS=30
  • FAIL_FAST_HEALTHCHECK=true

Repo layout

Aetheris/
  data/                    # synthetic finance dataset (50k+ rows)
  SRS.md                   # requirements/spec notes
  aetheris-edge-wasm/      # Rust edge scoring module compiled to Wasm
  aetheris-ingestion/      # Node.js stream + edge detection
  aetheris-agents/         # Python uv LangGraph intelligence layer

Tech stack

  • Edge/Ingestion: Node.js, MQTT.js, csv-parser
  • Edge compute option: Rust -> WebAssembly (wasm32-unknown-unknown)
  • Broker: EMQX
  • Intelligence: Python 3.11, uv, LangGraph, langchain-groq
  • Buffer/state: Redis Streams
  • Historical context tool: MongoDB

Why Wasm:

  • It moves the hottest edge math path (Z-score scoring) to compiled Rust for lower CPU overhead.
  • It keeps the existing Node MQTT pipeline intact, so migration is incremental and low-risk.
  • It gives deterministic compute behavior across runtimes while preserving current topic contracts.

Note: Legacy Node-based agent prototypes were removed from aetheris-ingestion to keep the active integration path single-source through aetheris-agents.

Run end-to-end

1) Start infrastructure

docker compose up -d

2) Start edge nodes

From aetheris-ingestion:

npm install
node subscribe.js -- --node-name=EdgeNode1 --node-count=5
node subscribe.js -- --node-name=EdgeNode2 --node-count=5
node subscribe.js -- --node-name=EdgeNode3 --node-count=5
node subscribe.js -- --node-name=EdgeNode4 --node-count=5
node subscribe.js -- --node-name=EdgeNode5 --node-count=5

2b) (Optional) Start Rust+Wasm edge nodes

Build Wasm first (from aetheris-ingestion):

npm run wasm:build

Then run Wasm-backed nodes:

npm run wasm:node1
npm run wasm:node2
npm run wasm:node3
npm run wasm:node4
npm run wasm:node5

3) Start intelligence service

From aetheris-agents:

uv sync
copy .env.example .env
# set GROQ_API_KEY and GROQ_MODEL
# optional throughput knobs:
# IMMEDIATE_BATCH_SIZE=5
# BATCH_SIZE=10
# IMMEDIATE_BATCH_MAX_WAIT_MS=150
# BATCH_MAX_WAIT_MS=300
# optional persistence knobs:
# MONGODB_EXCEPTIONS_COLLECTION=exceptions
# MONGODB_COMMANDS_COLLECTION=commands
# MONGODB_TOOL_AUDIT_COLLECTION=tool_audit
# MONGODB_AUDIT_TTL_DAYS=30
# FAIL_FAST_HEALTHCHECK=true
uv run python main.py

For low-cost testing:

GROQ_MODEL=llama-3.1-8b-instant

4) Replay stream

From aetheris-ingestion:

node publish.js

Current limitations

  • Throughput is improved by semantic batching, but LLM RPM limits can still dominate under sustained spikes.
  • LLM latency dominates throughput.
  • Orchestrator is single-process in current form.
  • Historical tooling and decision audit path assume Mongo collection readiness.

Next upgrades

  1. Multiple Redis consumer workers
  2. Retry/backoff + dead-letter handling for 429/network failures
  3. Rule-based triage before LLM (top-risk-only escalation)
  4. Prometheus/Grafana instrumentation
  5. Rust/Wasm edge runtime migration path

Final note

This repo shows the full engineering thought process: build fast, measure bottlenecks, and refactor architecture where needed instead of forcing a one-shot design.

Data source

Thanks to this dataset for powering the project experiments: https://www.kaggle.com/datasets/testdatabox/finance-fraud-and-loans-dataset-testdatabox

About

distributed anomaly-detection and decision pipeline for high-volume financial events.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors