-
Notifications
You must be signed in to change notification settings - Fork 715
refactor: unify bulk API to use the same ingestion function #9467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Failed to generate code suggestions for PR |
Greptile OverviewGreptile SummaryThis PR successfully unifies the bulk API and other log ingestion endpoints to use a single Key Changes:
Performance Improvements:
Code Quality:
Confidence Score: 4/5
Important Files ChangedFile Analysis
Sequence DiagramsequenceDiagram
participant Client
participant BulkAPI as Bulk API Handler
participant IngestFn as Unified Ingest Function
participant Pipeline as Pipeline Processor
participant WriteLog as Write Logs Function
participant WAL as Write-Ahead Log
Client->>BulkAPI: POST /_bulk (raw logs)
Note over BulkAPI: Parse bulk format<br/>line-by-line
loop For each data line
BulkAPI->>BulkAPI: Parse stream name & doc_id
BulkAPI->>BulkAPI: Validate timestamp
BulkAPI->>BulkAPI: Check stream not blocked
BulkAPI->>BulkAPI: Collect to HashMap<Stream, Vec<Value>>
end
Note over BulkAPI: Group by stream name
loop For each stream
BulkAPI->>IngestFn: ingest(IngestionRequest::JsonValues(Bulk, records))
Note over IngestFn: Single unified ingestion path<br/>for all log APIs
IngestFn->>IngestFn: Format stream name
IngestFn->>IngestFn: Check ingestion allowed
IngestFn->>IngestFn: Get pipeline & schema settings
alt Has Pipeline
loop For each record
IngestFn->>IngestFn: Flatten JSON
IngestFn->>IngestFn: Handle timestamp
IngestFn->>Pipeline: Buffer for batch processing
end
IngestFn->>Pipeline: process_batch()
Pipeline-->>IngestFn: Transformed records by destination stream
else No Pipeline
loop For each record
IngestFn->>IngestFn: Flatten JSON
IngestFn->>IngestFn: Handle timestamp
IngestFn->>IngestFn: Apply user-defined schema
IngestFn->>IngestFn: Add _original & _all_values if needed
IngestFn->>IngestFn: Group by hour key
end
end
IngestFn->>WriteLog: write_logs_by_stream()
loop For each stream
WriteLog->>WriteLog: Get stream schema & settings
WriteLog->>WriteLog: Check for schema evolution
WriteLog->>WriteLog: Cast types & validate records
WriteLog->>WriteLog: Evaluate alerts
WriteLog->>WriteLog: Collect distinct values
WriteLog->>WAL: write_file()
WAL-->>WriteLog: Request stats
WriteLog->>WriteLog: Report usage statistics
end
WriteLog-->>IngestFn: Success
IngestFn-->>BulkAPI: IngestionResponse with items
end
BulkAPI->>BulkAPI: Aggregate all responses
BulkAPI-->>Client: BulkResponse with all items
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
20 files reviewed, 1 comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors the bulk API ingestion to unify all log ingestion paths through a single service/logs/ingest/ingest function. The refactoring reduces code duplication, simplifies the bulk API to act as a data collection interface, and improves performance by reducing string allocations (approximately 3% improvement).
Key changes include:
- Unified ingestion flow where bulk API collects data into
HashMap<Stream, Vec<Value>>and delegates to the common ingest function - Removed lifetimes from
IngestionRequestand related types, transitioning to owned values for simpler code - Optimized
format_stream_nameto avoid unnecessary allocations when no changes are needed - Precomputed timestamp validation bounds (
ingest_allowed_upto_microandingest_allowed_in_future_micro) to avoid repeated calculations - Enhanced timestamp handling to skip unnecessary JSON updates when timestamps are already valid
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/service/logs/bulk.rs | Complete refactor: simplified to collect data by stream and delegate to unified ingest function, removing pipeline execution and schema handling from this layer |
| src/service/logs/ingest.rs | Updated to handle new IngestionRequest variants including JsonValues with IngestionValueType enum; optimized timestamp handling to avoid unnecessary updates |
| src/common/meta/ingestion.rs | Removed lifetimes from IngestionRequest and IngestionData enums, transitioning to owned types; added IngestionValueType enum for different JSON value sources |
| src/config/src/utils/time.rs | Enhanced parse_timestamp_micro_from_value to return tuple (timestamp, is_valid) indicating if timestamp needs conversion; added extensive test cases (with duplicates) |
| src/config/src/utils/schema.rs | Optimized format_stream_name to take owned String and avoid reallocations when no changes needed; added comprehensive tests |
| src/config/src/config.rs | Removed inverted_index_camel_case_tokenizer_disabled config; added precomputed ingest_allowed_upto_micro and ingest_allowed_in_future_micro fields |
| src/service/db/organization.rs | Added specialized get_org_setting_toggle_ingestion_logs function to avoid cloning entire settings object |
| src/service/logs/*.rs | Updated all log ingestion endpoints (otlp, loki, hec, patterns) to use owned IngestionRequest types |
| src/service/metrics/*.rs | Changed format_stream_name calls to pass owned String values |
| src/handler/http/request/*.rs | Updated ingestion handlers to pass owned bytes/requests instead of references |
| src/config/src/utils/tantivy/tokenizer/*.rs | Simplified tokenizer to always use O2Tokenizer (camel case aware), removing conditional SimpleTokenizer fallback |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Failed to generate code suggestions for PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
20 files reviewed, no comments
This PR changed bulk API logic that it only collect data to
HashMap<Stream, Vec<Value>>, and then useservice/logs/ingest/ingestfunction to write data by stream. This will unify all the log ingestion use same function to process data. and all the API become an interface only used to convert data tojson::valueby stream. the code much simple now.Also this PR reduced some string copy, it will improve 3% for ingestion.