fix: distribute agent snapshots to shared-nothing workers#1986
fix: distribute agent snapshots to shared-nothing workers#1986
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR introduces distributed agent snapshots—a versioned, gzip-compressed wire format capturing agent configuration, models, skills, souls, and memory state for execution across distributed workers. It adds snapshot building, serialization, and read-only store wrappers, integrates snapshots into the distributed task dispatch pipeline, and enables worker nodes to hydrate agent resources from snapshots instead of local stores. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (2)
internal/runtime/executor/task.go (1)
116-121: Defensively copy the snapshot bytes before attaching to the task.
[]byteis mutable; assigning it directly can leak caller-side mutations into the dispatched payload. Copying here makes task construction safer.♻️ Proposed change
func WithAgentSnapshot(snapshot []byte) TaskOption { return func(task *coordinatorv1.Task) { - task.AgentSnapshot = snapshot + if len(snapshot) == 0 { + task.AgentSnapshot = nil + return + } + task.AgentSnapshot = append([]byte(nil), snapshot...) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/executor/task.go` around lines 116 - 121, WithAgentSnapshot currently assigns the incoming []byte directly to coordinatorv1.Task.AgentSnapshot which can leak mutations; change WithAgentSnapshot to defensively copy the snapshot (handle nil by setting nil) before assigning so the Task holds its own byte slice (e.g., allocate a new []byte of len(snapshot) and copy into it) to prevent caller-side mutations from affecting the task.internal/service/scheduler/scheduler.go (1)
116-119: Prefer an interface-boundDAGStoredependency here.Snapshot enrichment now depends on
erbeing*entryReaderImpl. With any otherEntryReader, external sub-DAG resolution disappears and scheduler-built snapshots can become incomplete. Passingexec.DAGStoreintonewSchedulerdirectly, or exposing it on the interface, would avoid that hidden coupling.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/scheduler/scheduler.go` around lines 116 - 119, The current code casts er to *entryReaderImpl to grab impl.dagStore which hides a dependency and breaks snapshot enrichment for other EntryReader implementations; change newScheduler to accept an exec.DAGStore parameter (or add a DAGStore() exec.DAGStore method to the EntryReader interface) and use that explicit DAGStore inside newScheduler/scheduler construction instead of the type assertion, updating call sites that construct the scheduler to pass the DAGStore through.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/agent/snapshot.go`:
- Around line 422-427: The code silently ignores errors from the resolve
function in the r.walk path, causing BuildSnapshotForDAG to miss nested agent
steps; change r.walk (and its callers, including BuildSnapshotForDAG) to return
an error and, where resolve != nil, if resolve(ctx, target) returns a non-nil
err then return that error instead of skipping the branch; keep the existing
behavior only when resolve == nil (no-op), but ensure all call sites propagate
the returned error up so sub-DAG resolution failures surface during snapshot
planning.
- Around line 207-215: The snapshot currently uses reqs.sortedDAGNames() when
needsMemory is set by step.Agent.Memory.Enabled, which pulls memory from all
visited DAGs; instead track DAGs that opted into agent memory separately (e.g.,
build a reqs.memoryDAGNames or reqs.sortedMemoryDAGNames() collection while
evaluating step.Agent.Memory.Enabled) and pass that collection into
snapshotMemory(ctx, stores.MemoryStore, reqs.sortedMemoryDAGNames()) so
snapshot.Memory only contains DAG-scoped memory that explicitly enabled agent
memory; apply the same change wherever snapshotMemory or memory population is
invoked (the other occurrences referenced around the later blocks) and ensure
needsMemory remains driven by step.Agent.Memory.Enabled but uses the new
memory-specific name list for extraction and snapshot.Memory assignment.
- Around line 258-268: The helpers snapshotSkills and snapshotSouls currently
treat ErrSkillNotFound/ErrSoulNotFound as success and skip missing entries;
change them to fail-fast by returning an error when store.GetByID returns
ErrSkillNotFound or ErrSoulNotFound instead of continuing. Update the
switch/default branches in snapshotSkills (handling SkillStore.GetByID and
ErrSkillNotFound) and the analogous logic in snapshotSouls (SoulStore.GetByID
and ErrSoulNotFound) to return a descriptive wrapped error (e.g.,
fmt.Errorf("load skill %q for snapshot: %w", id, ErrSkillNotFound)) so the
caller sees missing references rather than producing an incomplete snapshot.
- Around line 106-117: The gzip decode path currently uses io.ReadAll(zr) which
allows decompressed data to grow unbounded; replace that with a bounded read
using an io.LimitedReader (or io.LimitReader) and a package-level constant like
maxUncompressedSnapshotSize to cap the decompressed size in the function that
opens the gzip reader (the block using gzip.NewReader and variable zr). Read up
to maxSize+1 bytes so you can detect overflow, and if the read returns more than
maxUncompressedSnapshotSize, return a clear error (e.g., "agent snapshot too
large"); ensure zr is still closed in the defer and propagate other read errors
as before.
In `@internal/agent/system_prompt.txt`:
- Around line 59-64: The template emits <memory_paths> even when Memory.ReadOnly
is true, which contradicts the read-only contract; update
internal/agent/system_prompt.txt to render the <memory_paths> section only when
MemoryDir is set AND Memory.ReadOnly is false (i.e., wrap the <memory_paths>
block in a conditional that checks !Memory.ReadOnly), leaving the existing
<memory_mode> block for the read-only notice intact so the prompt advertises no
writable memory targets during snapshot-backed runs.
In `@internal/service/scheduler/dag_executor.go`:
- Around line 255-262: The buildSnapshotBuilder function currently returns nil
when dagStore is nil, disabling snapshot creation; instead always return the
closure that calls agentsnapshot.BuildFromPaths so BuildFromPaths can handle a
nil dagStore itself—update buildSnapshotBuilder (the returned anonymous func) to
be returned unconditionally and simply call agentsnapshot.BuildFromPaths(ctx,
dag, paths, dagStore) even when dagStore is nil.
---
Nitpick comments:
In `@internal/runtime/executor/task.go`:
- Around line 116-121: WithAgentSnapshot currently assigns the incoming []byte
directly to coordinatorv1.Task.AgentSnapshot which can leak mutations; change
WithAgentSnapshot to defensively copy the snapshot (handle nil by setting nil)
before assigning so the Task holds its own byte slice (e.g., allocate a new
[]byte of len(snapshot) and copy into it) to prevent caller-side mutations from
affecting the task.
In `@internal/service/scheduler/scheduler.go`:
- Around line 116-119: The current code casts er to *entryReaderImpl to grab
impl.dagStore which hides a dependency and breaks snapshot enrichment for other
EntryReader implementations; change newScheduler to accept an exec.DAGStore
parameter (or add a DAGStore() exec.DAGStore method to the EntryReader
interface) and use that explicit DAGStore inside newScheduler/scheduler
construction instead of the type assertion, updating call sites that construct
the scheduler to pass the DAGStore through.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ad64e02c-b760-41af-8cca-fb7aee116c8a
⛔ Files ignored due to path filters (3)
proto/coordinator/v1/coordinator.pb.gois excluded by!**/*.pb.goproto/coordinator/v1/coordinator_protoopaque.pb.gois excluded by!**/*.pb.goproto/index/v1/index.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (26)
internal/agent/model_config.gointernal/agent/snapshot.gointernal/agent/snapshot_stores.gointernal/agent/snapshot_stores_test.gointernal/agent/snapshot_test.gointernal/agent/system_prompt.txtinternal/agent/system_prompt_test.gointernal/agentsnapshot/dispatch.gointernal/agentsnapshot/dispatch_test.gointernal/intg/queue/queue_test.gointernal/runtime/builtin/agentstep/executor.gointernal/runtime/executor/dag_runner.gointernal/runtime/executor/task.gointernal/runtime/executor/task_test.gointernal/service/frontend/api/v1/dagruns.gointernal/service/frontend/api/v1/dags.gointernal/service/frontend/api/v1/dags_test.gointernal/service/scheduler/dag_executor.gointernal/service/scheduler/dag_executor_test.gointernal/service/scheduler/queue_processor_startup_test.gointernal/service/scheduler/queue_processor_test.gointernal/service/scheduler/scheduler.gointernal/service/worker/remote_handler.gointernal/service/worker/remote_handler_test.gointernal/test/reschedule_inline_helpers.goproto/coordinator/v1/coordinator.proto
Summary
agent_snapshotpayload to distributed coordinator tasks so workers can execute agent steps without relying on local agent filesRoot cause
In shared-nothing distributed execution, workers only received the DAG definition and base config. Agent settings such as config, models, skills, souls, and memory lived in local filesystem stores on the coordinator or scheduler, so workers could not resolve the data required for agent step execution. Skill and soul stores were also absent on the worker side, which made distributed agent execution fail even when the DAG itself was dispatched correctly.
Test plan
make bingo test ./... -count=1Summary by CodeRabbit
New Features
Bug Fixes & Improvements