Refactor: Parallelize Investigation via LangGraph Send()#936
Conversation
4702392 to
00a21ac
Compare
Greptile SummaryThis PR replaces the sequential
Confidence Score: 4/5Safe to merge after fixing the tracker lifecycle bug in parallel.py. One P1 finding: tracker.start is called before the registry check in node_investigate_hypothesis, but the early-return path for a missing action name skips tracker.complete, leaving the tracker in an inconsistent state. The remaining findings are P2 (local import, no-op Send workaround, fragile CLEAR sentinel) and do not block merge. app/nodes/investigate/parallel.py — tracker not completed on registry miss. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
PA[plan_actions] --> DH{distribute_hypotheses}
DH -->|Send action_1| IH1[investigate_hypothesis action_1]
DH -->|Send action_2| IH2[investigate_hypothesis action_2]
DH -->|Send action_N| IHN[investigate_hypothesis action_N]
DH -->|empty: Send no-op| IHE[investigate_hypothesis action empty]
IH1 -->|appends hypothesis_results| MH[merge_hypothesis_results]
IH2 -->|appends hypothesis_results| MH
IHN -->|appends hypothesis_results| MH
IHE -->|returns empty results| MH
MH -->|clears hypothesis_results via CLEAR sentinel| D[diagnose]
D -->|investigate| PA
D -->|publish| PUB[publish END]
Reviews (1): Last reviewed commit: "feat(graph): migrate sequential investig..." | Re-trigger Greptile |
| tracker.start(f"investigate_{action_name}", f"Executing {action_name}") | ||
|
|
||
| available_sources = cast(dict[str, dict[str, object]], state.get("available_sources", {})) | ||
| all_actions = get_available_actions() | ||
| actions_by_name = {action.name: action for action in all_actions} | ||
|
|
||
| # Check if action is available | ||
| if action_name not in actions_by_name: | ||
| logger.warning("Planned action '%s' not found in action registry", action_name) | ||
| return {"hypothesis_results": []} |
There was a problem hiding this comment.
Tracker started but not completed on registry miss
tracker.start() is called on line 25 before the registry lookup, but when the action is absent from the registry the function returns early on line 34 without ever calling tracker.complete(). This leaves the tracker in a dangling state, which can corrupt CLI progress output on the next tracking call (the graph-nodes convention requires every tracker.start to be paired with a tracker.complete).
# Check if action is available
if action_name not in actions_by_name:
logger.warning("Planned action '%s' not found in action registry", action_name)
tracker.complete(
f"investigate_{action_name}",
fields_updated=[],
message=f"Skipped {action_name}: not in registry",
)
return {"hypothesis_results": []}| logger.exception("OpenSRE telemetry load failed during evidence gathering") | ||
| return prior, None | ||
| ev = seed.get("evidence") or prior | ||
| from app.output import debug_print |
There was a problem hiding this comment.
Local import inside function body
debug_print is imported inside _load_opensre_telemetry_into_evidence rather than at module level. The top-level from app.output import get_tracker import already pulls from the same module; debug_print should be added there.
| from app.output import debug_print | |
| from app.output import debug_print, get_tracker |
Then remove the local import on line 39.
| if not actions: | ||
| # No actions planned, skip to merge | ||
| return [ | ||
| Send( | ||
| "investigate_hypothesis", | ||
| {"action_to_run": "", "available_sources": available_sources}, | ||
| ) | ||
| ] |
There was a problem hiding this comment.
No-op Send on empty
planned_actions
When there are no planned actions the router dispatches a dummy Send with action_to_run: "" to investigate_hypothesis, which returns empty results and then routes into merge_hypothesis_results — performing _load_opensre_telemetry_into_evidence and summarize_execution_results unnecessarily. The old code short-circuited in this case. Consider routing directly to merge_hypothesis_results (e.g. via a Send("merge_hypothesis_results", {...})) to preserve the original fast-path behaviour and avoid the redundant work.
| def merge_results_reducer(existing: list | None, new: list | None) -> list: | ||
| if new == ["CLEAR"]: | ||
| return [] | ||
| if not existing: | ||
| return new or [] | ||
| if not new: | ||
| return existing | ||
| return existing + new |
There was a problem hiding this comment.
Fragile
["CLEAR"] sentinel breaks declared type
The reducer handles the special value ["CLEAR"] (a list[str]) to reset the field, but hypothesis_results is declared as list[dict[str, Any]]. Any type-checker or Pydantic validator that inspects the in-flight state during the clear cycle may reject this as a type mismatch. A narrower sentinel — for example a dedicated dataclass or simply returning None and guarding for None in the reducer — would be safer and self-documenting.
| @@ -0,0 +1,47 @@ | |||
| """Tests for parallel investigate hypothesis routing and merging.""" | |||
|
|
|||
| import pytest | |||
| import pytest | ||
| from langgraph.constants import Send | ||
|
|
||
| from app.nodes.investigate.execution.execute_actions import ActionExecutionResult |
| from langgraph.constants import Send | ||
|
|
||
| from app.nodes.investigate.execution.execute_actions import ActionExecutionResult | ||
| from app.nodes.investigate.merge import merge_hypothesis_results |
|
fixed it , check it |
VaibhavUpreti
left a comment
There was a problem hiding this comment.
awesome @Jeel3011 , thanks a lot for adding this, welcome to the OpenSRE community
Type of Change:
Description:
This PR resolves a significant performance bottleneck in the
investigateloop by migrating from sequential action execution to a parallelized, native LangGraph Fan-out/Fan-in architecture usinglanggraph.constants.Send.Changes:
node_investigateintonode_investigate_hypothesis(executing a single action) andmerge_hypothesis_results.distribute_hypothesesrouter to dispatch parallel graph branches based on theplanned_actionsarray.AgentStatewith a custommerge_results_reducerto properly combine isolated subgraphhypothesis_resultsconcurrently.graph.pyto route through these new parallel nodes natively.Testing:
tests/nodes/test_parallel_investigate.pyto verify proper sub-graph routing distribution.make lint,make typecheck,make test-cov); the entire test suite passes.Impact Analysis:
AgentStateschema changes are backwards-compatible additions (action_to_runandhypothesis_results), and the final state mutation behaves identically to the old sequential node.AI-Assisted PRs:
planned_actions).