Skip to content

Conversation

@ByteBaker
Copy link
Contributor

@ByteBaker ByteBaker commented Dec 1, 2025

Architecture

Replaces in-memory EdgeStore with daemon that periodically queries trace streams using DataFusion SQL.

Flow:

  1. Daemon runs in compactor every 1 hour by default (configurable)
  2. Queries all trace streams for last 60 minutes (configurable)
  3. SQL aggregation with CTE extracts client/server edges
  4. Calculates p50/p95/p99 percentiles using approx functions
  5. Writes aggregated edges to _o2_service_graph internal stream
  6. Topology API queries _o2_service_graph for last 60 minutes

Zero impact on trace ingestion - completely decoupled.

SQL Query

WITH edges AS (
  SELECT
    CASE WHEN CAST(span_kind AS VARCHAR) = '3' THEN service_name ELSE peer_service END as client,
    CASE WHEN CAST(span_kind AS VARCHAR) = '3' THEN peer_service ELSE service_name END as server,
    end_time - start_time as duration,
    span_status
  FROM "trace_stream"
  WHERE _timestamp >= {start} AND _timestamp < {end}
    AND CAST(span_kind AS VARCHAR) IN ('2', '3')
    AND peer_service IS NOT NULL
)
SELECT
  client, server,
  COUNT(*) as total_requests,
  COUNT(*) FILTER (WHERE span_status = 'ERROR') as errors,
  approx_median(duration) as p50,
  approx_percentile_cont(duration, 0.95) as p95,
  approx_percentile_cont(duration, 0.99) as p99
FROM edges
GROUP BY client, server

Configuration

  • O2_SERVICE_GRAPH_ENABLED=true - Enable feature
  • O2_SERVICE_GRAPH_PROCESSING_INTERVAL_SECS=3600 - How often daemon runs (default: 1 hour)
  • O2_SERVICE_GRAPH_QUERY_TIME_RANGE_MINUTES=60 - Query window size (default: 60 min)

Changes

Added:

  • processor.rs (202 lines) - SQL daemon
  • aggregator.rs (155 lines) - Stream writer

Modified:

  • api.rs - Query from _o2_service_graph stream, return availableStreams
  • compactor.rs - Integrate service_graph_processor job
  • traces/mod.rs - Remove ALL inline processing

Deleted:

  • job/service_graph.rs - Standalone daemon (now in compactor)
  • In-memory EdgeStore, buffering, workers

Enterprise dependency:

  • Requires companion PR in o2-enterprise (business logic migration)

Technical Details

  • Span kinds: '2' (SERVER), '3' (CLIENT) stored as VARCHAR in parquet
  • Edge timestamp: Uses query end time for API compatibility
  • Feature-gated: Stubs for non-enterprise builds
  • Query timeout: 300s for large trace volumes

@ByteBaker ByteBaker requested a review from oasisk December 1, 2025 09:53
@ByteBaker ByteBaker changed the title Service Graph: SQL-based daemon architecture feat: service graph through streams Dec 1, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Dec 1, 2025

Failed to generate code suggestions for PR

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Dec 1, 2025

Greptile Overview

Greptile Summary

Replaces in-memory service graph processing with SQL-based daemon architecture that runs in compactor every hour, completely decoupling service graph from trace ingestion.

Major Changes:

  • Added processor.rs (202 lines) - SQL daemon that queries trace streams using DataFusion with CTE for client/server edge extraction and percentile calculations
  • Added aggregator.rs (155 lines) - Writes SQL-aggregated edges to _o2_service_graph internal stream via bulk ingestion
  • Modified api.rs - Queries _o2_service_graph stream for last 60 minutes, returns availableStreams for filtering
  • Modified compactor.rs - Integrated service_graph_processor job with configurable interval
  • Modified traces/mod.rs - Removed ALL inline processing during trace ingestion (lines 398-399)
  • Frontend updated to support stream filtering via query params

Architecture Benefits:

  • Zero impact on trace ingestion performance
  • SQL aggregation leverages DataFusion for efficient processing
  • Configurable processing interval (default: 1 hour) and query window (default: 60 min)
  • Graceful error handling per stream (failures don't block other streams)

Configuration:

  • O2_SERVICE_GRAPH_ENABLED=true
  • O2_SERVICE_GRAPH_PROCESSING_INTERVAL_SECS=3600
  • O2_SERVICE_GRAPH_QUERY_TIME_RANGE_MINUTES=60

Confidence Score: 4/5

  • Safe to merge with attention to SQL query assumptions and companion PR requirement
  • Clean architectural refactor with proper error handling and feature gating. Score reflects: (1) SQL query assumes peer_service field exists in trace schema, (2) requires companion o2-enterprise PR for business logic, (3) no validation on stream_filter query parameter for SQL injection prevention
  • Pay close attention to src/service/traces/service_graph/processor.rs - SQL query assumes peer_service field exists

Important Files Changed

File Analysis

Filename Score Overview
src/service/traces/service_graph/processor.rs 4/5 New SQL daemon queries trace streams periodically, uses DataFusion SQL for aggregation; properly handles errors per-stream
src/service/traces/service_graph/aggregator.rs 5/5 Transforms SQL results to storage schema, writes to _o2_service_graph via bulk ingestion; field mapping correct
src/service/traces/service_graph/api.rs 4/5 Queries _o2_service_graph stream for last 60min, handles stream non-existence gracefully, returns availableStreams
src/job/compactor.rs 5/5 Added service_graph_processor job running every processing_interval_secs with error handling
src/service/traces/mod.rs 5/5 Removed ALL inline service graph processing from trace ingestion path, only comment remains at line 398-399

Sequence Diagram

sequenceDiagram
    participant Client as Frontend Client
    participant API as Service Graph API
    participant Compactor as Compactor Job
    participant Processor as SQL Processor
    participant DataFusion as DataFusion Query Engine
    participant TraceStream as Trace Streams
    participant ServiceGraphStream as _o2_service_graph Stream
    
    Note over Compactor,Processor: Daemon runs every processing_interval_secs (default: 3600s)
    
    Compactor->>Processor: process_service_graph()
    Processor->>Processor: get_trace_streams() - List all orgs & trace streams
    
    loop For each trace stream
        Processor->>DataFusion: Execute SQL with CTE
        Note right of DataFusion: WITH edges AS (<br/>  SELECT client, server, duration, span_status<br/>  WHERE span_kind IN ('2','3')<br/>  AND peer_service IS NOT NULL<br/>)<br/>SELECT COUNT(*), percentiles<br/>GROUP BY client, server
        DataFusion->>TraceStream: Query last 60min of traces
        TraceStream-->>DataFusion: Span records
        DataFusion-->>Processor: Aggregated edges (p50/p95/p99)
        Processor->>ServiceGraphStream: write_sql_aggregated_edges()
        Note right of ServiceGraphStream: Bulk ingest via logs API<br/>to _o2_service_graph
    end
    
    Note over Client,API: User queries topology (independent of daemon)
    
    Client->>API: GET /topology/current?stream_name=X
    API->>ServiceGraphStream: SQL query last 60min
    ServiceGraphStream-->>API: Edge records
    API->>API: build_topology() - Convert to nodes/edges
    API-->>Client: {nodes, edges, availableStreams}
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

Replace in-memory EdgeStore with daemon-based architecture that periodically
queries trace streams using SQL aggregation in DataFusion.

Architecture:
- Daemon runs in compactor job every 1 hour (default, configurable)
- Queries last 60 minutes of trace data (configurable window)
- Uses DataFusion SQL with CTE for efficient aggregation
- Calculates p50, p95, p99 percentiles using approx functions
- Writes aggregated edges to internal `_o2_service_graph` stream
- Zero impact on trace ingestion performance

Key Changes:
- Add `processor.rs` with SQL-based daemon that queries trace streams
- Add `aggregator.rs` to write SQL results to `_o2_service_graph` stream (uses json! macro)
- Refactor `api.rs` to query pre-aggregated data from stream
- Move business logic to enterprise repo (`build_topology()`, `span_to_graph_span()`)
- Remove ALL inline processing during trace ingestion
- Delete in-memory EdgeStore, worker threads, and buffering code
- Feature-gated for enterprise builds with non-enterprise stubs

SQL Query:
- CTE extracts client/server from span_kind ('2'=SERVER, '3'=CLIENT)
- Aggregates by (client, server) with COUNT, percentiles, error rate
- Filters: span_kind IN ('2','3') AND peer_service IS NOT NULL

Configuration:
- O2_SERVICE_GRAPH_ENABLED (default: false)
- O2_SERVICE_GRAPH_PROCESSING_INTERVAL_SECS (default: 3600) - Daemon run frequency
- O2_SERVICE_GRAPH_QUERY_TIME_RANGE_MINUTES (default: 60) - Query window size

Technical Details:
- Stream name: `_o2_service_graph` (internal stream)
- Edge timestamp: Uses query end time for API compatibility
- Span kinds: '2' (SERVER), '3' (CLIENT) as VARCHAR in parquet
- Percentiles: approx_median(), approx_percentile_cont(0.95), approx_percentile_cont(0.99)
@ByteBaker ByteBaker merged commit ada8085 into main Dec 1, 2025
37 of 38 checks passed
@ByteBaker ByteBaker deleted the feat/sgraph-improv branch December 1, 2025 11:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants