Skip to content

Conversation

@neha00290
Copy link
Contributor

@neha00290 neha00290 commented Oct 23, 2025

PR Type

Tests


Description

  • Add pytest for dynamic namespace routing

  • Create and enable pipelines via API

  • Ingest logs post-creation and verify routing

  • Optional cleanup to delete created pipelines


Diagram Walkthrough

flowchart LR
  A["Parametrized test inputs"] -- "build payload" --> B["Create pipeline via API"]
  B -- "enable pipeline" --> C["Ingest logs to source"]
  C -- "wait and verify" --> D["Search destination stream count"]
  C -- "verify source matches" --> E["Search source for condition"]
  D -- "report counts" --> F["Keep pipeline for monitoring"]
  G["Cleanup test"] -- "find by name" --> H["Delete pipeline via API"]
Loading

File Walkthrough

Relevant files
Tests
test_pipeline_dynamic.py
Add tests for pipeline dynamic namespace routing                 

tests/api-testing/tests/test_pipeline_dynamic.py

  • Add parametrized test creating dynamic routing pipelines
  • Enable pipeline, ingest logs, verify counts via SQL search
  • Include optional cleanup test to delete created pipelines
  • Use unique node IDs and names; wait for processing
+243/-0 

@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Flaky Timing

Fixed 10-second sleep after ingestion may be insufficient or excessive depending on environment load; consider polling with timeout/backoff for determinism.

# Wait for pipeline to process the newly ingested data
import time
print(f"⏳ Waiting 10 seconds for pipeline to process data...")
time.sleep(10)
Query Mismatch

The condition node uses 'Contains' operator, but source verification query uses equality on the same field; align the verification SQL to 'LIKE' or similar to truly validate routing logic.

source_condition_payload = {
    "query": {
        "sql": f'SELECT COUNT(*) as matching_records FROM "{source_stream}" WHERE "{condition_field}" = \'{condition_value}\'',
        "start_time": start_time,
        "end_time": end_time,
        "from": 0,
        "size": 1,
    }
}

resp_source_check = session.post(f"{url}api/{org_id}/_search?type=logs", json=source_condition_payload)
if resp_source_check.status_code == 200:
    source_result = resp_source_check.json()
    if source_result.get("hits", []):
        matching_count = source_result["hits"][0].get("matching_records", 0)
        print(f"📊 Source stream '{source_stream}' has {matching_count} records matching {condition_field}='{condition_value}'")
        if matching_count > 0:
            print(f"✅ Pipeline should route {matching_count} records to destination stream")
        else:
Test Data Coupling

Test relies on external file path '../test-data/logs_data.json'; path may break depending on working directory. Use pytest resources or pkg resources to load reliably.

# Load test data for later ingestion
with open("../test-data/logs_data.json") as f:
    logs_data = json.load(f)

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Poll instead of fixed sleep

Replace fixed sleep with a bounded polling loop against the destination stream count
to reduce test flakiness and shorten duration when the pipeline is fast. Poll until
records appear or a timeout is reached.

tests/api-testing/tests/test_pipeline_dynamic.py [132-135]

 import time
-print(f"⏳ Waiting 10 seconds for pipeline to process data...")
-time.sleep(10)
+print("⏳ Waiting for pipeline to process data (up to 60s)...")
+deadline = time.time() + 60
+processed = False
+while time.time() < deadline:
+    search_payload = {
+        "query": {
+            "sql": f'SELECT COUNT(*) as count FROM "{destination_stream}"',
+            "start_time": start_time,
+            "end_time": end_time,
+            "from": 0,
+            "size": 1,
+        }
+    }
+    resp_search = session.post(f"{url}api/{org_id}/_search?type=logs", json=search_payload)
+    if resp_search.status_code == 200:
+        data = resp_search.json()
+        if data.get("hits"):
+            count = data["hits"][0].get("count", 0)
+            if count > 0:
+                processed = True
+                print(f"✅ Destination stream '{destination_stream}' has {count} records")
+                break
+    time.sleep(2)
+assert processed, "Pipeline did not process data within timeout"
Suggestion importance[1-10]: 8

__

Why: Replacing a fixed 10s sleep with bounded polling reduces flakiness and overall test duration while maintaining correctness; it leverages existing variables and addresses a meaningful stability concern.

Medium
Safe error body logging

Avoid calling .json() on non-2xx responses, as some servers return non-JSON bodies
leading to exceptions that mask the real failure. Log a safe text snippet instead to
keep the assertion meaningful.

tests/api-testing/tests/test_pipeline_dynamic.py [98-103]

 resp_create_pipeline = session.post(f"{url}api/{org_id}/pipelines", json=pipeline_payload)
 
 if resp_create_pipeline.status_code != expected_status:
-    print(f"Pipeline creation failed. Response: {resp_create_pipeline.json()}")
+    body_preview = resp_create_pipeline.text[:500]
+    print(f"Pipeline creation failed. Status: {resp_create_pipeline.status_code}. Body: {body_preview}")
 
 assert resp_create_pipeline.status_code == expected_status, f"Expected status code {expected_status} but got {resp_create_pipeline.status_code}"
Suggestion importance[1-10]: 7

__

Why: Avoiding resp.json() on non-JSON error responses prevents masking the real failure and yields clearer logs; aligns with the new hunk code and is a practical reliability improvement.

Medium
Possible issue
Robust file loading and decoding

Explicitly set encoding when opening the JSON file to avoid locale-dependent
decoding issues in different environments. Also fail fast with a clear message if
the file is missing or malformed to prevent silent test flakiness.

tests/api-testing/tests/test_pipeline_dynamic.py [30-31]

-with open("../test-data/logs_data.json") as f:
-    logs_data = json.load(f)
+try:
+    with open("../test-data/logs_data.json", "r", encoding="utf-8") as f:
+        logs_data = json.load(f)
+except (FileNotFoundError, json.JSONDecodeError) as e:
+    pytest.fail(f"Failed to load test data: {e}")
Suggestion importance[1-10]: 6

__

Why: Setting explicit UTF-8 encoding and failing fast on missing/malformed JSON improves test robustness and diagnostics; the change accurately targets lines 30-31 and is correct but not critical.

Low

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR adds comprehensive pytest tests for pipeline dynamic routing based on Kubernetes namespace names. The tests validate pipeline creation, enabling, data ingestion, and conditional routing from source streams to destination streams based on kubernetes.namespace_name field values.

Key Changes:

  • Added test_pipeline_dynamic_namespace_routing with parameterized test cases for three different routing scenarios
  • Included validation of pipeline state, data ingestion, and verification of routed records in destination streams
  • Added cleanup test test_cleanup_dynamic_pipelines to remove created pipelines after testing
  • Tests cover the full pipeline lifecycle: create → enable → ingest → verify → cleanup

Issues Found:

  • SQL queries use f-string interpolation which could be vulnerable to injection (violates custom rule about safe SQL parameter binding)
  • Print statements used throughout instead of a logging framework (violates custom testing guidelines)
  • Import statement for time module placed inside function instead of at module level

Confidence Score: 3/5

  • This PR is moderately safe to merge with some code quality concerns that should be addressed
  • The test functionality appears correct and comprehensive, covering pipeline creation, routing, and cleanup. However, the SQL injection vulnerability (line 180) is a critical concern even in test code as it sets a bad security example and could be exploited if test data is untrusted. The extensive use of print statements instead of logging and the misplaced import statement are style issues that reduce code quality but don't affect functionality.
  • The SQL query construction on line 180 needs immediate attention to prevent potential SQL injection vulnerabilities

Important Files Changed

File Analysis

Filename Score Overview
tests/api-testing/tests/test_pipeline_dynamic.py 3/5 New test file for pipeline dynamic routing with namespace-based conditions; uses print statements instead of logging and has SQL injection concerns

Sequence Diagram

sequenceDiagram
    participant Test as Test Runner
    participant API as OpenObserve API
    participant Pipeline as Pipeline Service
    participant Stream as Data Stream
    
    Test->>API: POST /api/{org}/pipelines (create pipeline)
    API-->>Test: 200 OK (pipeline created)
    
    Test->>API: GET /api/{org}/pipelines (list pipelines)
    API-->>Test: 200 OK (pipeline list with IDs)
    
    Test->>API: PUT /api/{org}/pipelines/{id}/enable?value=true
    API-->>Pipeline: Enable pipeline
    Pipeline-->>API: Pipeline enabled
    API-->>Test: 200 OK
    
    Test->>API: POST /api/{org}/{source_stream}/_json (ingest data)
    API->>Stream: Write to source stream
    Stream->>Pipeline: Trigger pipeline processing
    Pipeline->>Pipeline: Apply condition filter (kubernetes.namespace_name)
    Pipeline->>Stream: Route matching records to destination stream
    API-->>Test: 200 OK (data ingested)
    
    Test->>Test: Sleep 10 seconds (wait for processing)
    
    Test->>API: GET /api/{org}/pipelines (verify pipeline state)
    API-->>Test: 200 OK (pipeline enabled=true)
    
    Test->>API: POST /api/{org}/_search (query destination stream)
    API->>Stream: Query destination stream records
    Stream-->>API: Return record count
    API-->>Test: 200 OK (record count)
    
    Test->>API: POST /api/{org}/_search (query source stream)
    API->>Stream: Query source stream matching records
    Stream-->>API: Return matching count
    API-->>Test: 200 OK (matching count)
    
    Note over Test: Cleanup test (separate test function)
    Test->>API: GET /api/{org}/pipelines (get pipeline ID)
    API-->>Test: 200 OK (pipeline list)
    Test->>API: DELETE /api/{org}/pipelines/{id}
    API->>Pipeline: Delete pipeline
    Pipeline-->>API: Pipeline deleted
    API-->>Test: 200 OK
Loading

1 file reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: Subhra264 | Branch: pytests-pipeline-dynamic-names | Commit: 49a365c

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 365 342 0 19 4 94% 4m 39s

View Detailed Results

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: Subhra264 | Branch: pytests-pipeline-dynamic-names | Commit: 49a365c

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 365 341 0 19 5 93% 4m 38s

View Detailed Results

@Subhra264 Subhra264 changed the title test: add pipeline dynamic names pytests fix: pipelines should be able to create streams with dynamic names Oct 23, 2025
@github-actions github-actions bot added the ☢️ Bug Something isn't working label Oct 23, 2025
@neha00290
Copy link
Contributor Author

@greptile

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR fixes a bug where pipeline destination streams with dynamic template variables (e.g., {kubernetes.namespace_name}) were being incorrectly formatted during validation, preventing runtime template substitution from working correctly.

Key Changes:

  • Removed premature format_stream_name() call in pipeline validation that was converting dots to underscores
  • Removed unused imports (get_config, format_stream_name)
  • Added comprehensive pytest suite to verify dynamic namespace routing works correctly
  • Tests include SQL injection protection helpers and bug detection logic

Impact:
This fix enables pipelines to properly route logs to dynamically-named destination streams based on log field values, a critical feature for multi-tenant and namespace-based routing scenarios.

Confidence Score: 4/5

  • This PR is safe to merge with one minor issue to address
  • The Rust code change is minimal and correct - it removes premature formatting that was breaking dynamic templates. However, the test file has a critical issue with relative file path that could cause test failures, and includes an unused helper function.
  • tests/api-testing/tests/test_pipeline_dynamic.py needs the file path issue fixed before merging

Important Files Changed

File Analysis

Filename Score Overview
src/config/src/meta/pipeline/mod.rs 5/5 Removed automatic stream name formatting to allow dynamic template variables like {kubernetes.namespace_name} to work correctly
tests/api-testing/tests/test_pipeline_dynamic.py 4/5 Comprehensive test for dynamic pipeline routing with template substitution, includes SQL injection protection and bug detection logic

Sequence Diagram

sequenceDiagram
    participant Test as Test Suite
    participant API as Pipeline API
    participant Pipeline as Pipeline Engine
    participant Storage as Stream Storage
    
    Test->>API: POST /api/default/pipelines<br/>(destination: {kubernetes.namespace_name})
    API->>Pipeline: Validate pipeline config
    Note over Pipeline: Previously: format_stream_name()<br/>would convert dots to underscores<br/>NOW: keeps template intact
    Pipeline->>API: Return pipeline_id
    API->>Test: 200 OK
    
    Test->>API: PUT /pipelines/{id}/enable
    API->>Pipeline: Enable pipeline
    Pipeline->>API: 200 OK
    
    Test->>API: POST /api/default/source_stream/_json<br/>(logs with kubernetes.namespace_name)
    API->>Pipeline: Process logs through pipeline
    Pipeline->>Pipeline: Evaluate condition<br/>(kubernetes.namespace_name contains "monitoring")
    Pipeline->>Pipeline: Substitute template<br/>({kubernetes.namespace_name} → monitoring)
    Pipeline->>Storage: Write to "default_logs_monitoring_test"
    
    Test->>API: POST /api/default/_search<br/>(query destination stream)
    API->>Storage: SELECT COUNT(*) FROM destination
    Storage->>Test: Return count (verify routing)
Loading

2 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: neha00290 | Branch: pytests-pipeline-dynamic-names | Commit: 7b5a39a

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 365 343 0 19 3 94% 4m 39s

View Detailed Results

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: neha00290 | Branch: pytests-pipeline-dynamic-names | Commit: 7b5a39a

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 365 341 0 19 5 93% 4m 40s

View Detailed Results

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: neha00290 | Branch: pytests-pipeline-dynamic-names | Commit: 7b5a39a

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 365 343 0 19 3 94% 4m 39s

View Detailed Results

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: hengfeiyang | Branch: pytests-pipeline-dynamic-names | Commit: 3f3798a

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 365 339 0 19 7 93% 4m 38s

View Detailed Results

@neha00290 neha00290 merged commit 0a6d9a8 into main Oct 23, 2025
32 checks passed
@neha00290 neha00290 deleted the pytests-pipeline-dynamic-names branch October 23, 2025 12:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants