Skip to content

Refactor: Parallelize Investigation via LangGraph Send()#936

Merged
VaibhavUpreti merged 2 commits intoTracer-Cloud:mainfrom
Jeel3011:feature/parallel-investigate-nodes
Apr 27, 2026
Merged

Refactor: Parallelize Investigation via LangGraph Send()#936
VaibhavUpreti merged 2 commits intoTracer-Cloud:mainfrom
Jeel3011:feature/parallel-investigate-nodes

Conversation

@Jeel3011
Copy link
Copy Markdown
Contributor

Type of Change:

  • Feature
  • Bug Fix
  • Improvement (Refactor / Performance)

Description:
This PR resolves a significant performance bottleneck in the investigate loop by migrating from sequential action execution to a parallelized, native LangGraph Fan-out/Fan-in architecture using langgraph.constants.Send.

Changes:

  • Split the monolithic node_investigate into node_investigate_hypothesis (executing a single action) and merge_hypothesis_results.
  • Implemented the distribute_hypotheses router to dispatch parallel graph branches based on the planned_actions array.
  • Updated AgentState with a custom merge_results_reducer to properly combine isolated subgraph hypothesis_results concurrently.
  • Re-wired graph.py to route through these new parallel nodes natively.

Testing:

  • Added tests/nodes/test_parallel_investigate.py to verify proper sub-graph routing distribution.
  • Ran all local checks (make lint, make typecheck, make test-cov); the entire test suite passes.
  • Verified graph compilations and state synchronizations remain intact.

Impact Analysis:

  • Backward compatible? Yes. The AgentState schema changes are backwards-compatible additions (action_to_run and hypothesis_results), and the final state mutation behaves identically to the old sequential node.
  • Performance impact: Highly positive. Actions that depend on external APIs (e.g., Datadog, Prometheus) now execute concurrently in the LangGraph lifecycle.

AI-Assisted PRs:

  • I reviewed every single line of AI-generated code.
  • I understand the logic and can explain it in my own words.
  • I tested edge cases (e.g., handling empty planned_actions).
  • I modified output to match project conventions (Ruff formatting, MyPy types).
  • Verified tests pass with the code.

@Jeel3011 Jeel3011 force-pushed the feature/parallel-investigate-nodes branch from 4702392 to 00a21ac Compare April 25, 2026 10:26
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 25, 2026

Greptile Summary

This PR replaces the sequential node_investigate with a LangGraph fan-out/fan-in pattern: distribute_hypotheses dispatches each planned action as a parallel Send branch to node_investigate_hypothesis, whose results are collected by the new merge_hypothesis_results node before flowing to diagnose. The architecture change is sound and the graph wiring is correct.

  • P1 – tracker lifecycle bug in parallel.py: tracker.start() is called before the registry lookup; if the action is not found the node returns early without a matching tracker.complete(), leaving the tracker in a dangling state on every unknown-action hit.

Confidence Score: 4/5

Safe to merge after fixing the tracker lifecycle bug in parallel.py.

One P1 finding: tracker.start is called before the registry check in node_investigate_hypothesis, but the early-return path for a missing action name skips tracker.complete, leaving the tracker in an inconsistent state. The remaining findings are P2 (local import, no-op Send workaround, fragile CLEAR sentinel) and do not block merge.

app/nodes/investigate/parallel.py — tracker not completed on registry miss.

Important Files Changed

Filename Overview
app/nodes/investigate/parallel.py New node executing one action per parallel branch; has a P1 tracker lifecycle bug when the action is absent from the registry.
app/nodes/investigate/merge.py Renamed/refactored merge node; logic is sound but has a local import inside a helper function that should be top-level.
app/pipeline/routing.py New distribute_hypotheses fan-out router using Send; no-op empty-action workaround adds unnecessary overhead but is functionally correct.
app/state/agent_state.py Adds hypothesis_results with custom reducer and action_to_run; ["CLEAR"] sentinel breaks declared list[dict] type in transit.
app/pipeline/graph.py Graph wiring updated correctly: fan-out via distribute_hypotheses, fan-in through merge_hypothesis_resultsdiagnose; loop-back to plan_actions intact.
tests/nodes/test_parallel_investigate.py Covers routing distribution and empty-action cases; missing a test for the unknown-action-name path that triggers the tracker bug.
tests/cli/test_args.py New CLI argument tests; unrelated to parallel investigate changes, all look correct.
tests/cli/test_discover.py Expanded test coverage for test-catalog discovery helpers; straightforward and correct.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    PA[plan_actions] --> DH{distribute_hypotheses}
    DH -->|Send action_1| IH1[investigate_hypothesis action_1]
    DH -->|Send action_2| IH2[investigate_hypothesis action_2]
    DH -->|Send action_N| IHN[investigate_hypothesis action_N]
    DH -->|empty: Send no-op| IHE[investigate_hypothesis action empty]
    IH1 -->|appends hypothesis_results| MH[merge_hypothesis_results]
    IH2 -->|appends hypothesis_results| MH
    IHN -->|appends hypothesis_results| MH
    IHE -->|returns empty results| MH
    MH -->|clears hypothesis_results via CLEAR sentinel| D[diagnose]
    D -->|investigate| PA
    D -->|publish| PUB[publish END]
Loading

Reviews (1): Last reviewed commit: "feat(graph): migrate sequential investig..." | Re-trigger Greptile

Comment on lines +25 to +34
tracker.start(f"investigate_{action_name}", f"Executing {action_name}")

available_sources = cast(dict[str, dict[str, object]], state.get("available_sources", {}))
all_actions = get_available_actions()
actions_by_name = {action.name: action for action in all_actions}

# Check if action is available
if action_name not in actions_by_name:
logger.warning("Planned action '%s' not found in action registry", action_name)
return {"hypothesis_results": []}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Tracker started but not completed on registry miss

tracker.start() is called on line 25 before the registry lookup, but when the action is absent from the registry the function returns early on line 34 without ever calling tracker.complete(). This leaves the tracker in a dangling state, which can corrupt CLI progress output on the next tracking call (the graph-nodes convention requires every tracker.start to be paired with a tracker.complete).

    # Check if action is available
    if action_name not in actions_by_name:
        logger.warning("Planned action '%s' not found in action registry", action_name)
        tracker.complete(
            f"investigate_{action_name}",
            fields_updated=[],
            message=f"Skipped {action_name}: not in registry",
        )
        return {"hypothesis_results": []}

Comment thread app/nodes/investigate/merge.py Outdated
logger.exception("OpenSRE telemetry load failed during evidence gathering")
return prior, None
ev = seed.get("evidence") or prior
from app.output import debug_print
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Local import inside function body

debug_print is imported inside _load_opensre_telemetry_into_evidence rather than at module level. The top-level from app.output import get_tracker import already pulls from the same module; debug_print should be added there.

Suggested change
from app.output import debug_print
from app.output import debug_print, get_tracker

Then remove the local import on line 39.

Comment thread app/pipeline/routing.py Outdated
Comment on lines +55 to +62
if not actions:
# No actions planned, skip to merge
return [
Send(
"investigate_hypothesis",
{"action_to_run": "", "available_sources": available_sources},
)
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No-op Send on empty planned_actions

When there are no planned actions the router dispatches a dummy Send with action_to_run: "" to investigate_hypothesis, which returns empty results and then routes into merge_hypothesis_results — performing _load_opensre_telemetry_into_evidence and summarize_execution_results unnecessarily. The old code short-circuited in this case. Consider routing directly to merge_hypothesis_results (e.g. via a Send("merge_hypothesis_results", {...})) to preserve the original fast-path behaviour and avoid the redundant work.

Comment thread app/state/agent_state.py Outdated
Comment on lines +22 to +29
def merge_results_reducer(existing: list | None, new: list | None) -> list:
if new == ["CLEAR"]:
return []
if not existing:
return new or []
if not new:
return existing
return existing + new
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Fragile ["CLEAR"] sentinel breaks declared type

The reducer handles the special value ["CLEAR"] (a list[str]) to reset the field, but hypothesis_results is declared as list[dict[str, Any]]. Any type-checker or Pydantic validator that inspects the in-flight state during the clear cycle may reject this as a type mismatch. A narrower sentinel — for example a dedicated dataclass or simply returning None and guarding for None in the reducer — would be safer and self-documenting.

@@ -0,0 +1,47 @@
"""Tests for parallel investigate hypothesis routing and merging."""

import pytest
import pytest
from langgraph.constants import Send

from app.nodes.investigate.execution.execute_actions import ActionExecutionResult
from langgraph.constants import Send

from app.nodes.investigate.execution.execute_actions import ActionExecutionResult
from app.nodes.investigate.merge import merge_hypothesis_results
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jeel3011 could you please address these?

@Jeel3011
Copy link
Copy Markdown
Contributor Author

fixed it , check it

Copy link
Copy Markdown
Member

@VaibhavUpreti VaibhavUpreti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome @Jeel3011 , thanks a lot for adding this, welcome to the OpenSRE community

@VaibhavUpreti VaibhavUpreti merged commit 872b830 into Tracer-Cloud:main Apr 27, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants