Skip to content

Commit c459273

Browse files
committed
feat(orchestration): typed ConcurrencyLimit error, edge-case tests, cancel token (#1515 #1516 #1457)
#1515: Add SubAgentError::ConcurrencyLimit { active, max } to replace fragile string matching in record_spawn_failure(). Both spawn() and resume() in SubAgentManager emit the typed variant. DagScheduler's record_spawn_failure now accepts &SubAgentError and uses matches!. #1516: Add three DagScheduler edge-case tests for concurrency deferral: - test_concurrency_deferral_does_not_affect_running_task - test_max_concurrent_zero_no_infinite_loop - test_all_tasks_deferred_graph_stays_running #1457: Add plan_cancel_token: Option<CancellationToken> to Agent. run_scheduler_loop gains a tokio::select! on the token at wait_event() and around RunInline execution. handle_plan_cancel fires the token for in-flight plans. Token always cleared on both Ok and Err paths (CRIT-07). Known limitation: delivery path requires agent loop restructure (SEC-M34-002).
1 parent fd07cd6 commit c459273

File tree

5 files changed

+261
-34
lines changed

5 files changed

+261
-34
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
3737

3838
### Added
3939

40+
- **#1515**: Add `SubAgentError::ConcurrencyLimit { active: usize, max: usize }` variant to replace the fragile `Spawn(String)` concurrency message. `record_spawn_failure()` now accepts `&SubAgentError` and uses a typed `matches!` check instead of string matching. Both `spawn()` and `resume()` in `SubAgentManager` emit the new variant. Callers pass `&e` instead of `&e.to_string()`.
41+
- **#1516**: Add three edge-case tests for `DagScheduler` concurrency-deferral: running task is unaffected when a concurrent task defers (`test_concurrency_deferral_does_not_affect_running_task`), `max_parallel=0` stalls the scheduler without triggering deadlock detection (`test_max_concurrent_zero_no_infinite_loop`), and all tasks deferring with `ConcurrencyLimit` keep the graph in `Running` and retry on the next tick (`test_all_tasks_deferred_graph_stays_running`).
42+
- **#1457**: Add `plan_cancel_token: Option<CancellationToken>` to `Agent`. A fresh token is created in `handle_plan_confirm()` and passed into `run_scheduler_loop()`. The tick loop adds a `tokio::select!` branch on `cancel_token.cancelled()` at `wait_event()` (calls `cancel_all()` and breaks) and wraps `RunInline` execution so it can be interrupted. `handle_plan_cancel()` fires the token if a plan is in flight. `plan_cancel_token` is always cleared in both `Ok` and `Err` paths to prevent stale-token bugs. **Known limitation**: the delivery path for `/plan cancel` during active execution requires restructuring the agent message loop (SEC-M34-002; currently only reachable from concurrent-reader channels such as Telegram).
43+
4044
- **#1551**: Remove the `index` feature flag — `zeph-index` and `tree-sitter` are now always-on base dependencies. All `#[cfg(feature = "index")]` guards are removed from `zeph-core`, `zeph` binary, and `lsp_hooks/hover.rs`. The `index` entry is removed from root `Cargo.toml` `[features]` and `full` feature list, and from `zeph-core/Cargo.toml`. Tree-sitter and code index functionality is always compiled; no feature gating required.
4145
- **#1554**: Decouple repo map injection from Qdrant retriever. `IndexState` now populates `repo_map_tokens`/`repo_map_ttl` independently via `AgentBuilder::with_repo_map()`. The repo map is injected into the system prompt whenever `repo_map_tokens > 0`, regardless of whether a Qdrant-backed `CodeRetriever` is available. Semantic code RAG via Qdrant is unaffected and still requires the retriever. The `apply_code_index()` bootstrap function now configures repo map for all providers (including Claude/OpenAI with native `tool_use`), then skips only the Qdrant retriever setup for tool-use providers. `apply_config()` hot-reload now correctly refreshes both `repo_map_tokens` and `repo_map_ttl`. Fixes silent repo map omission for the most common provider configurations.
4246
- **#1552**: Replace heuristic AST walking in `generate_repo_map()` with tree-sitter ts-query extraction. New public types in `zeph-index`: `SymbolInfo`, `SymbolKind`, `Visibility`, and `extract_symbols()`. `Lang::symbol_query()` and `Lang::method_query()` provide lazily-compiled `LazyLock<Query>` per language (Rust, Python, JS, TS, Go). Visibility is parsed from `visibility_modifier` node text: `pub`→Public, `pub(crate)`→Crate, `pub(super|in …)`→Restricted, absent→Private. Query compilation failures log a warning and return `None` (no panics); heuristic extraction serves as fallback. Repo map output now includes visibility and 1-based line numbers per symbol (e.g. `pub fn:hello(1)`, `impl:Foo(5){pub fn:bar}`). Token budget behaviour is preserved with the new format. `zeph-index::languages` is now a public module.

crates/zeph-core/src/agent/mod.rs

Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,15 @@ pub struct Agent<C: Channel> {
247247
pending_image_parts: Vec<zeph_llm::provider::MessagePart>,
248248
/// Graph waiting for `/plan confirm` before execution starts.
249249
pub(super) pending_graph: Option<crate::orchestration::TaskGraph>,
250+
/// Cancellation token for the currently executing plan. `None` when no plan is running.
251+
/// Created fresh in `handle_plan_confirm()`, cancelled in `handle_plan_cancel()`.
252+
///
253+
/// # Known limitation
254+
///
255+
/// Token plumbing is ready; the delivery path requires the agent message loop to be
256+
/// restructured so `/plan cancel` can be received while `run_scheduler_loop` holds
257+
/// `&mut self`. See follow-up issue (SEC-M34-002).
258+
plan_cancel_token: Option<CancellationToken>,
250259

251260
/// LSP context injection hooks. Fires after native tool execution, injects
252261
/// diagnostics/hover notes as `Role::System` messages before the next LLM call.
@@ -452,6 +461,7 @@ impl<C: Channel> Agent<C> {
452461
},
453462
pending_image_parts: Vec::new(),
454463
pending_graph: None,
464+
plan_cancel_token: None,
455465

456466
#[cfg(feature = "lsp-context")]
457467
lsp_hooks: None,
@@ -664,7 +674,18 @@ impl<C: Channel> Agent<C> {
664674
))
665675
.await?;
666676

667-
let final_status = self.run_scheduler_loop(&mut scheduler, task_count).await?;
677+
let plan_token = CancellationToken::new();
678+
self.plan_cancel_token = Some(plan_token.clone());
679+
680+
// Use match instead of ? so plan_cancel_token is always cleared (CRIT-07).
681+
let scheduler_result = self
682+
.run_scheduler_loop(&mut scheduler, task_count, plan_token)
683+
.await;
684+
self.plan_cancel_token = None;
685+
let final_status = match scheduler_result {
686+
Ok(s) => s,
687+
Err(e) => return Err(e),
688+
};
668689

669690
let completed_graph = scheduler.into_graph();
670691

@@ -693,14 +714,17 @@ impl<C: Channel> Agent<C> {
693714
/// # Known limitations
694715
///
695716
/// The agent is single-threaded; this loop blocks all message processing while
696-
/// running. `/plan cancel` cannot interrupt an active execution. A future phase
697-
/// will add a `CancellationToken` field to `Agent` and wire it into this loop.
698-
/// (SEC-M34-001, tracked in GitHub issue.)
717+
/// running. The `cancel_token` parameter wires cancellation into the tick loop at
718+
/// `wait_event()` and `RunInline` boundaries. However, `/plan cancel` cannot deliver
719+
/// the token signal while `run_scheduler_loop` holds `&mut self` — the agent command
720+
/// dispatch is paused. The token plumbing is in place for a follow-up that restructures
721+
/// the delivery path (SEC-M34-002).
699722
#[allow(clippy::too_many_lines)]
700723
async fn run_scheduler_loop(
701724
&mut self,
702725
scheduler: &mut crate::orchestration::DagScheduler,
703726
task_count: usize,
727+
cancel_token: CancellationToken,
704728
) -> Result<crate::orchestration::GraphStatus, error::AgentError> {
705729
use crate::orchestration::SchedulerAction;
706730

@@ -762,7 +786,7 @@ impl<C: Channel> Agent<C> {
762786
}
763787
Err(e) => {
764788
tracing::error!(error = %e, %task_id, "spawn_for_task failed");
765-
let extra = scheduler.record_spawn_failure(task_id, &e.to_string());
789+
let extra = scheduler.record_spawn_failure(task_id, &e);
766790
for a in extra {
767791
match a {
768792
SchedulerAction::Cancel { agent_handle_id } => {
@@ -822,14 +846,23 @@ impl<C: Channel> Agent<C> {
822846

823847
let event_tx = scheduler.event_sender();
824848
let max_iter = self.tool_orchestrator.max_iterations;
825-
let outcome = match self.run_inline_tool_loop(&prompt, max_iter).await {
826-
Ok(output) => crate::orchestration::TaskOutcome::Completed {
827-
output,
828-
artifacts: vec![],
829-
},
830-
Err(e) => crate::orchestration::TaskOutcome::Failed {
831-
error: e.to_string(),
832-
},
849+
let outcome = tokio::select! {
850+
result = self.run_inline_tool_loop(&prompt, max_iter) => {
851+
match result {
852+
Ok(output) => crate::orchestration::TaskOutcome::Completed {
853+
output,
854+
artifacts: vec![],
855+
},
856+
Err(e) => crate::orchestration::TaskOutcome::Failed {
857+
error: e.to_string(),
858+
},
859+
}
860+
}
861+
() = cancel_token.cancelled() => {
862+
crate::orchestration::TaskOutcome::Failed {
863+
error: "canceled".to_string(),
864+
}
865+
}
833866
};
834867
let event = crate::orchestration::TaskEvent {
835868
task_id,
@@ -860,7 +893,33 @@ impl<C: Channel> Agent<C> {
860893
m.orchestration_graph = Some(snapshot);
861894
});
862895

863-
scheduler.wait_event().await;
896+
tokio::select! {
897+
() = cancel_token.cancelled() => {
898+
let cancel_actions = scheduler.cancel_all();
899+
for action in cancel_actions {
900+
match action {
901+
SchedulerAction::Cancel { agent_handle_id } => {
902+
if let Some(mgr) = self.subagent_manager.as_mut() {
903+
let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
904+
tracing::trace!(
905+
error = %e,
906+
"cancel during plan cancellation: agent already gone"
907+
);
908+
});
909+
}
910+
}
911+
SchedulerAction::Done { status } => {
912+
break 'tick status;
913+
}
914+
SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
915+
}
916+
}
917+
// Defensive fallback: cancel_all always emits Done, but guard against
918+
// future changes.
919+
break 'tick crate::orchestration::GraphStatus::Canceled;
920+
}
921+
() = scheduler.wait_event() => {}
922+
}
864923
};
865924

866925
// Final drain: if the loop exited via Done on the first tick, secret
@@ -1178,7 +1237,16 @@ impl<C: Channel> Agent<C> {
11781237
&mut self,
11791238
_graph_id: Option<&str>,
11801239
) -> Result<(), error::AgentError> {
1181-
if self.pending_graph.take().is_some() {
1240+
if let Some(token) = self.plan_cancel_token.take() {
1241+
// In-flight plan: signal cancellation. The scheduler loop will pick this up
1242+
// in the next tokio::select! iteration at wait_event().
1243+
// NOTE: Due to &mut self being held by run_scheduler_loop, this branch is only
1244+
// reachable if the channel has a concurrent reader (e.g. Telegram, TUI events).
1245+
// CLI and synchronous channels cannot deliver this while the loop is active
1246+
// (SEC-M34-002).
1247+
token.cancel();
1248+
self.channel.send("Canceling plan execution...").await?;
1249+
} else if self.pending_graph.take().is_some() {
11821250
let now = std::time::Instant::now();
11831251
self.update_metrics(|m| {
11841252
if let Some(ref mut s) = m.orchestration_graph {

0 commit comments

Comments
 (0)