Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

@hengfeiyang hengfeiyang commented Dec 4, 2025

This PR changed bulk API logic that it only collect data to HashMap<Stream, Vec<Value>>, and then use service/logs/ingest/ingest function to write data by stream. This will unify all the log ingestion use same function to process data. and all the API become an interface only used to convert data to json::value by stream. the code much simple now.

Also this PR reduced some string copy, it will improve 3% for ingestion.

@github-actions
Copy link
Contributor

github-actions bot commented Dec 4, 2025

Failed to generate code suggestions for PR

@hengfeiyang hengfeiyang changed the title refact: unify bulk API to use the same ingestion function refactor: unify bulk API to use the same ingestion function Dec 4, 2025
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Dec 4, 2025

Greptile Overview

Greptile Summary

This PR successfully unifies the bulk API and other log ingestion endpoints to use a single ingest() function, significantly simplifying the codebase by removing ~780 lines while adding only ~600 lines (net reduction of ~180 lines). The refactoring consolidates duplicated logic across different ingestion APIs (bulk, HEC, Loki) into a unified pipeline.

Key Changes:

  • Refactored bulk API to collect data into HashMap<Stream, Vec<Value>> and delegate processing to the unified ingest() function
  • Updated IngestionRequest enum to support JsonValues variant with different types (Bulk, Hec, Loki)
  • Optimized format_stream_name() to take ownership and use Cow to avoid unnecessary string allocations
  • Modified parse_bulk_index() to return string slices instead of owned strings
  • Added pre-calculated timestamp thresholds in config to avoid repeated duration calculations

Performance Improvements:
The PR achieves the claimed ~3% ingestion performance improvement through several string allocation optimizations:

  1. format_stream_name() now avoids cloning when no changes are needed
  2. parse_bulk_index() returns borrowed string slices instead of owned strings
  3. Pre-calculated timestamp thresholds reduce repeated duration calculations

Code Quality:

  • Excellent test coverage added for the refactored functions
  • Clean separation of concerns with the unified ingestion pathway
  • All log ingestion APIs now follow the same code path, improving maintainability

Confidence Score: 4/5

  • This refactoring is well-structured and safe to merge with minor considerations
  • The refactoring successfully unifies ingestion logic with good test coverage and clear performance benefits. The score reflects one previously identified issue in src/config/src/utils/schema.rs:294 where the function attempts to return stream_name after it was already moved/borrowed in the Cow::Borrowed case, which may cause compilation issues. The core logic is sound and the changes are well-tested across multiple ingestion paths.
  • Pay special attention to src/config/src/utils/schema.rs - the format_stream_name() function has a potential ownership issue in the Cow::Borrowed branch that was already noted in previous review threads

Important Files Changed

File Analysis

Filename Score Overview
src/service/logs/bulk.rs 4/5 Refactored bulk API to collect data into HashMap<Stream, Vec<Value>> and delegates processing to unified ingest() function, significantly simplifying the logic by removing ~500 lines of duplicated code
src/service/logs/ingest.rs 4/5 Enhanced unified ingestion function to handle various request types (JsonValues with Bulk, Hec, Loki variants) through the IngestionRequest enum, centralizing all log ingestion processing
src/config/src/utils/schema.rs 3/5 Optimized format_stream_name() to take ownership and avoid string cloning by leveraging Cow to detect whether replacements occurred, reducing unnecessary allocations
src/service/logs/mod.rs 5/5 Modified parse_bulk_index() to return string slices instead of owned String values, eliminating unnecessary string allocations during bulk request parsing
src/common/meta/ingestion.rs 5/5 Added IngestionValueType enum and updated IngestionRequest to support JsonValues variant for different API types, enabling unified handling of bulk, HEC, and Loki ingestion

Sequence Diagram

sequenceDiagram
    participant Client
    participant BulkAPI as Bulk API Handler
    participant IngestFn as Unified Ingest Function
    participant Pipeline as Pipeline Processor
    participant WriteLog as Write Logs Function
    participant WAL as Write-Ahead Log

    Client->>BulkAPI: POST /_bulk (raw logs)
    
    Note over BulkAPI: Parse bulk format<br/>line-by-line
    
    loop For each data line
        BulkAPI->>BulkAPI: Parse stream name & doc_id
        BulkAPI->>BulkAPI: Validate timestamp
        BulkAPI->>BulkAPI: Check stream not blocked
        BulkAPI->>BulkAPI: Collect to HashMap<Stream, Vec<Value>>
    end
    
    Note over BulkAPI: Group by stream name
    
    loop For each stream
        BulkAPI->>IngestFn: ingest(IngestionRequest::JsonValues(Bulk, records))
        
        Note over IngestFn: Single unified ingestion path<br/>for all log APIs
        
        IngestFn->>IngestFn: Format stream name
        IngestFn->>IngestFn: Check ingestion allowed
        IngestFn->>IngestFn: Get pipeline & schema settings
        
        alt Has Pipeline
            loop For each record
                IngestFn->>IngestFn: Flatten JSON
                IngestFn->>IngestFn: Handle timestamp
                IngestFn->>Pipeline: Buffer for batch processing
            end
            IngestFn->>Pipeline: process_batch()
            Pipeline-->>IngestFn: Transformed records by destination stream
        else No Pipeline
            loop For each record
                IngestFn->>IngestFn: Flatten JSON
                IngestFn->>IngestFn: Handle timestamp
                IngestFn->>IngestFn: Apply user-defined schema
                IngestFn->>IngestFn: Add _original & _all_values if needed
                IngestFn->>IngestFn: Group by hour key
            end
        end
        
        IngestFn->>WriteLog: write_logs_by_stream()
        
        loop For each stream
            WriteLog->>WriteLog: Get stream schema & settings
            WriteLog->>WriteLog: Check for schema evolution
            WriteLog->>WriteLog: Cast types & validate records
            WriteLog->>WriteLog: Evaluate alerts
            WriteLog->>WriteLog: Collect distinct values
            WriteLog->>WAL: write_file()
            WAL-->>WriteLog: Request stats
            WriteLog->>WriteLog: Report usage statistics
        end
        
        WriteLog-->>IngestFn: Success
        IngestFn-->>BulkAPI: IngestionResponse with items
    end
    
    BulkAPI->>BulkAPI: Aggregate all responses
    BulkAPI-->>Client: BulkResponse with all items
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the bulk API ingestion to unify all log ingestion paths through a single service/logs/ingest/ingest function. The refactoring reduces code duplication, simplifies the bulk API to act as a data collection interface, and improves performance by reducing string allocations (approximately 3% improvement).

Key changes include:

  • Unified ingestion flow where bulk API collects data into HashMap<Stream, Vec<Value>> and delegates to the common ingest function
  • Removed lifetimes from IngestionRequest and related types, transitioning to owned values for simpler code
  • Optimized format_stream_name to avoid unnecessary allocations when no changes are needed
  • Precomputed timestamp validation bounds (ingest_allowed_upto_micro and ingest_allowed_in_future_micro) to avoid repeated calculations
  • Enhanced timestamp handling to skip unnecessary JSON updates when timestamps are already valid

Reviewed changes

Copilot reviewed 28 out of 28 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/service/logs/bulk.rs Complete refactor: simplified to collect data by stream and delegate to unified ingest function, removing pipeline execution and schema handling from this layer
src/service/logs/ingest.rs Updated to handle new IngestionRequest variants including JsonValues with IngestionValueType enum; optimized timestamp handling to avoid unnecessary updates
src/common/meta/ingestion.rs Removed lifetimes from IngestionRequest and IngestionData enums, transitioning to owned types; added IngestionValueType enum for different JSON value sources
src/config/src/utils/time.rs Enhanced parse_timestamp_micro_from_value to return tuple (timestamp, is_valid) indicating if timestamp needs conversion; added extensive test cases (with duplicates)
src/config/src/utils/schema.rs Optimized format_stream_name to take owned String and avoid reallocations when no changes needed; added comprehensive tests
src/config/src/config.rs Removed inverted_index_camel_case_tokenizer_disabled config; added precomputed ingest_allowed_upto_micro and ingest_allowed_in_future_micro fields
src/service/db/organization.rs Added specialized get_org_setting_toggle_ingestion_logs function to avoid cloning entire settings object
src/service/logs/*.rs Updated all log ingestion endpoints (otlp, loki, hec, patterns) to use owned IngestionRequest types
src/service/metrics/*.rs Changed format_stream_name calls to pass owned String values
src/handler/http/request/*.rs Updated ingestion handlers to pass owned bytes/requests instead of references
src/config/src/utils/tantivy/tokenizer/*.rs Simplified tokenizer to always use O2Tokenizer (camel case aware), removing conditional SimpleTokenizer fallback

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@hengfeiyang hengfeiyang marked this pull request as draft December 4, 2025 07:19
@hengfeiyang hengfeiyang marked this pull request as ready for review December 4, 2025 08:16
@github-actions
Copy link
Contributor

github-actions bot commented Dec 4, 2025

Failed to generate code suggestions for PR

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@hengfeiyang hengfeiyang merged commit 04b15af into main Dec 4, 2025
53 of 55 checks passed
@hengfeiyang hengfeiyang deleted the perf/ingestion-bulk branch December 4, 2025 10:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants