-
Notifications
You must be signed in to change notification settings - Fork 715
fix: pipelines should be able to create streams with dynamic names #8874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Explore these optional code suggestions:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR adds comprehensive pytest tests for pipeline dynamic routing based on Kubernetes namespace names. The tests validate pipeline creation, enabling, data ingestion, and conditional routing from source streams to destination streams based on kubernetes.namespace_name field values.
Key Changes:
- Added
test_pipeline_dynamic_namespace_routingwith parameterized test cases for three different routing scenarios - Included validation of pipeline state, data ingestion, and verification of routed records in destination streams
- Added cleanup test
test_cleanup_dynamic_pipelinesto remove created pipelines after testing - Tests cover the full pipeline lifecycle: create → enable → ingest → verify → cleanup
Issues Found:
- SQL queries use f-string interpolation which could be vulnerable to injection (violates custom rule about safe SQL parameter binding)
- Print statements used throughout instead of a logging framework (violates custom testing guidelines)
- Import statement for
timemodule placed inside function instead of at module level
Confidence Score: 3/5
- This PR is moderately safe to merge with some code quality concerns that should be addressed
- The test functionality appears correct and comprehensive, covering pipeline creation, routing, and cleanup. However, the SQL injection vulnerability (line 180) is a critical concern even in test code as it sets a bad security example and could be exploited if test data is untrusted. The extensive use of print statements instead of logging and the misplaced import statement are style issues that reduce code quality but don't affect functionality.
- The SQL query construction on line 180 needs immediate attention to prevent potential SQL injection vulnerabilities
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| tests/api-testing/tests/test_pipeline_dynamic.py | 3/5 | New test file for pipeline dynamic routing with namespace-based conditions; uses print statements instead of logging and has SQL injection concerns |
Sequence Diagram
sequenceDiagram
participant Test as Test Runner
participant API as OpenObserve API
participant Pipeline as Pipeline Service
participant Stream as Data Stream
Test->>API: POST /api/{org}/pipelines (create pipeline)
API-->>Test: 200 OK (pipeline created)
Test->>API: GET /api/{org}/pipelines (list pipelines)
API-->>Test: 200 OK (pipeline list with IDs)
Test->>API: PUT /api/{org}/pipelines/{id}/enable?value=true
API-->>Pipeline: Enable pipeline
Pipeline-->>API: Pipeline enabled
API-->>Test: 200 OK
Test->>API: POST /api/{org}/{source_stream}/_json (ingest data)
API->>Stream: Write to source stream
Stream->>Pipeline: Trigger pipeline processing
Pipeline->>Pipeline: Apply condition filter (kubernetes.namespace_name)
Pipeline->>Stream: Route matching records to destination stream
API-->>Test: 200 OK (data ingested)
Test->>Test: Sleep 10 seconds (wait for processing)
Test->>API: GET /api/{org}/pipelines (verify pipeline state)
API-->>Test: 200 OK (pipeline enabled=true)
Test->>API: POST /api/{org}/_search (query destination stream)
API->>Stream: Query destination stream records
Stream-->>API: Return record count
API-->>Test: 200 OK (record count)
Test->>API: POST /api/{org}/_search (query source stream)
API->>Stream: Query source stream matching records
Stream-->>API: Return matching count
API-->>Test: 200 OK (matching count)
Note over Test: Cleanup test (separate test function)
Test->>API: GET /api/{org}/pipelines (get pipeline ID)
API-->>Test: 200 OK (pipeline list)
Test->>API: DELETE /api/{org}/pipelines/{id}
API->>Pipeline: Delete pipeline
Pipeline-->>API: Pipeline deleted
API-->>Test: 200 OK
1 file reviewed, 3 comments
|
| Status | Total | Passed | Failed | Skipped | Flaky | Pass Rate | Duration |
|---|---|---|---|---|---|---|---|
| All tests passed | 365 | 342 | 0 | 19 | 4 | 94% | 4m 39s |
…ve/openobserve into pytests-pipeline-dynamic-names
|
| Status | Total | Passed | Failed | Skipped | Flaky | Pass Rate | Duration |
|---|---|---|---|---|---|---|---|
| All tests passed | 365 | 341 | 0 | 19 | 5 | 93% | 4m 38s |
…ve/openobserve into pytests-pipeline-dynamic-names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR fixes a bug where pipeline destination streams with dynamic template variables (e.g., {kubernetes.namespace_name}) were being incorrectly formatted during validation, preventing runtime template substitution from working correctly.
Key Changes:
- Removed premature
format_stream_name()call in pipeline validation that was converting dots to underscores - Removed unused imports (
get_config,format_stream_name) - Added comprehensive pytest suite to verify dynamic namespace routing works correctly
- Tests include SQL injection protection helpers and bug detection logic
Impact:
This fix enables pipelines to properly route logs to dynamically-named destination streams based on log field values, a critical feature for multi-tenant and namespace-based routing scenarios.
Confidence Score: 4/5
- This PR is safe to merge with one minor issue to address
- The Rust code change is minimal and correct - it removes premature formatting that was breaking dynamic templates. However, the test file has a critical issue with relative file path that could cause test failures, and includes an unused helper function.
- tests/api-testing/tests/test_pipeline_dynamic.py needs the file path issue fixed before merging
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/config/src/meta/pipeline/mod.rs | 5/5 | Removed automatic stream name formatting to allow dynamic template variables like {kubernetes.namespace_name} to work correctly |
| tests/api-testing/tests/test_pipeline_dynamic.py | 4/5 | Comprehensive test for dynamic pipeline routing with template substitution, includes SQL injection protection and bug detection logic |
Sequence Diagram
sequenceDiagram
participant Test as Test Suite
participant API as Pipeline API
participant Pipeline as Pipeline Engine
participant Storage as Stream Storage
Test->>API: POST /api/default/pipelines<br/>(destination: {kubernetes.namespace_name})
API->>Pipeline: Validate pipeline config
Note over Pipeline: Previously: format_stream_name()<br/>would convert dots to underscores<br/>NOW: keeps template intact
Pipeline->>API: Return pipeline_id
API->>Test: 200 OK
Test->>API: PUT /pipelines/{id}/enable
API->>Pipeline: Enable pipeline
Pipeline->>API: 200 OK
Test->>API: POST /api/default/source_stream/_json<br/>(logs with kubernetes.namespace_name)
API->>Pipeline: Process logs through pipeline
Pipeline->>Pipeline: Evaluate condition<br/>(kubernetes.namespace_name contains "monitoring")
Pipeline->>Pipeline: Substitute template<br/>({kubernetes.namespace_name} → monitoring)
Pipeline->>Storage: Write to "default_logs_monitoring_test"
Test->>API: POST /api/default/_search<br/>(query destination stream)
API->>Storage: SELECT COUNT(*) FROM destination
Storage->>Test: Return count (verify routing)
2 files reviewed, 2 comments
|
| Status | Total | Passed | Failed | Skipped | Flaky | Pass Rate | Duration |
|---|---|---|---|---|---|---|---|
| All tests passed | 365 | 343 | 0 | 19 | 3 | 94% | 4m 39s |
|
| Status | Total | Passed | Failed | Skipped | Flaky | Pass Rate | Duration |
|---|---|---|---|---|---|---|---|
| All tests passed | 365 | 341 | 0 | 19 | 5 | 93% | 4m 40s |
|
| Status | Total | Passed | Failed | Skipped | Flaky | Pass Rate | Duration |
|---|---|---|---|---|---|---|---|
| All tests passed | 365 | 343 | 0 | 19 | 3 | 94% | 4m 39s |
|
| Status | Total | Passed | Failed | Skipped | Flaky | Pass Rate | Duration |
|---|---|---|---|---|---|---|---|
| All tests passed | 365 | 339 | 0 | 19 | 7 | 93% | 4m 38s |
PR Type
Tests
Description
Add pytest for dynamic namespace routing
Create and enable pipelines via API
Ingest logs post-creation and verify routing
Optional cleanup to delete created pipelines
Diagram Walkthrough
File Walkthrough
test_pipeline_dynamic.py
Add tests for pipeline dynamic namespace routingtests/api-testing/tests/test_pipeline_dynamic.py