Skip to content

fix: distribute agent snapshots to shared-nothing workers#1986

Merged
yottahmd merged 4 commits intomainfrom
feat/agent-worker-snapshot
Apr 10, 2026
Merged

fix: distribute agent snapshots to shared-nothing workers#1986
yottahmd merged 4 commits intomainfrom
feat/agent-worker-snapshot

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 10, 2026

Summary

  • add a bounded, versioned agent_snapshot payload to distributed coordinator tasks so workers can execute agent steps without relying on local agent files
  • snapshot the required agent config, models, skills, souls, and read-only memory at dispatch time for frontend start/retry, scheduler dispatch, and distributed sub-DAG dispatch
  • hydrate read-only in-memory agent stores on workers and make snapshot-backed tasks fail fast when the snapshot is invalid instead of silently falling back to unrelated local state
  • mark snapshot-backed memory as read-only execution context so agent prompts stop advertising writable memory paths during distributed runs

Root cause

In shared-nothing distributed execution, workers only received the DAG definition and base config. Agent settings such as config, models, skills, souls, and memory lived in local filesystem stores on the coordinator or scheduler, so workers could not resolve the data required for agent step execution. Skill and soul stores were also absent on the worker side, which made distributed agent execution fail even when the DAG itself was dispatched correctly.

Test plan

  • make bin
  • go test ./... -count=1

Summary by CodeRabbit

  • New Features

    • Added distributed agent snapshot capability: agent configurations, models, skills, and memory can now be sent to workers for execution.
    • Introduced read-only memory mode for agents, preventing memory modifications during execution.
    • Added configurable snapshot size limits (4MB default) for distributed execution.
  • Bug Fixes & Improvements

    • Enhanced task initialization error reporting for snapshot hydration failures.
    • Implemented defensive copying and stable ordering for snapshot-backed store operations.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 10, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e228c032-6a46-41c1-b751-79557b0261d5

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR introduces distributed agent snapshots—a versioned, gzip-compressed wire format capturing agent configuration, models, skills, souls, and memory state for execution across distributed workers. It adds snapshot building, serialization, and read-only store wrappers, integrates snapshots into the distributed task dispatch pipeline, and enables worker nodes to hydrate agent resources from snapshots instead of local stores.

Changes

Cohort / File(s) Summary
Snapshot infrastructure
internal/agent/snapshot.go, internal/agent/snapshot_stores.go, internal/agent/snapshot_stores_test.go, internal/agent/snapshot_test.go
Core snapshot implementation: wire format with version/config/models/skills/souls/memory envelope; serialization with gzip compression; DAG-scoped snapshot building with size limits and store requirement validation; read-only in-memory store wrappers for configs/models/skills/souls/memory indexed by ID and sorted by name; snapshot-backed search with pagination and case-insensitive matching.
Agent model and system prompt
internal/agent/model_config.go, internal/agent/system_prompt.txt, internal/agent/system_prompt_test.go
Added ReadOnly bool field to MemoryContent; updated system prompt template with conditional memory-mode guidance (read-only instructions, disabled memory management section); new test case for read-only memory handling.
Snapshot dispatch entry points
internal/agentsnapshot/dispatch.go, internal/agentsnapshot/dispatch_test.go
Two exported snapshot builders: BuildFromPaths (filesystem-backed stores) and BuildFromContext (pre-injected runtime stores); DAG traversal to determine snapshot necessity; propagates snapshot errors and enforces size limits.
Distributed task dispatch integration
internal/runtime/executor/dag_runner.go, internal/runtime/executor/task.go, internal/runtime/executor/task_test.go, internal/service/frontend/api/v1/dags.go, internal/service/frontend/api/v1/dagruns.go, internal/service/scheduler/dag_executor.go, internal/runtime/builtin/agentstep/executor.go
Snapshot building during coordinator task construction; new WithAgentSnapshot task option; updated coordinatorTaskOptions signature to return error; snapshot builder passed through DAGExecutor constructor; memory store read-only flag detection in agent executor.
Scheduler and queue processor
internal/service/scheduler/scheduler.go, internal/service/scheduler/queue_processor_startup_test.go, internal/service/scheduler/queue_processor_test.go, internal/service/scheduler/dag_executor_test.go
Scheduler extracts DAGStore from entry reader and passes snapshot builder to DAGExecutor; test call sites updated to match new constructor signature with snapshot builder parameter.
Worker snapshot consumption
internal/service/worker/remote_handler.go, internal/service/worker/remote_handler_test.go
Worker extracts snapshot from coordinator task; executeDAGRun accepts snapshot and conditionally hydrates stores via agentStoresFromSnapshot (unmarshals snapshot, validates config/models, returns skill/soul/memory); extended agent store bundle with skill and soul stores; task init errors reported via reportTaskInitFailure; comprehensive test coverage for snapshot hydration and init-failure reporting.
Test infrastructure and protocol
proto/coordinator/v1/coordinator.proto, internal/intg/queue/queue_test.go, internal/test/reschedule_inline_helpers.go, internal/service/frontend/api/v1/dags_test.go
Added agent_snapshot field to Task protobuf message; test call sites updated to supply new snapshotBuilder parameter to NewDAGExecutor.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • fix: route reschedule through enqueue #1966: Modifies the distributed dispatch path in internal/service/frontend/api/v1/dagruns.go and coordinator task options construction; related through overlapping integration points in retry and start dispatch flows.
🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.58% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: adding distributed agent snapshots to enable shared-nothing worker execution. It is specific, concise, and clearly reflects the primary objective.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/agent-worker-snapshot

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (2)
internal/runtime/executor/task.go (1)

116-121: Defensively copy the snapshot bytes before attaching to the task.

[]byte is mutable; assigning it directly can leak caller-side mutations into the dispatched payload. Copying here makes task construction safer.

♻️ Proposed change
 func WithAgentSnapshot(snapshot []byte) TaskOption {
 	return func(task *coordinatorv1.Task) {
-		task.AgentSnapshot = snapshot
+		if len(snapshot) == 0 {
+			task.AgentSnapshot = nil
+			return
+		}
+		task.AgentSnapshot = append([]byte(nil), snapshot...)
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/runtime/executor/task.go` around lines 116 - 121, WithAgentSnapshot
currently assigns the incoming []byte directly to
coordinatorv1.Task.AgentSnapshot which can leak mutations; change
WithAgentSnapshot to defensively copy the snapshot (handle nil by setting nil)
before assigning so the Task holds its own byte slice (e.g., allocate a new
[]byte of len(snapshot) and copy into it) to prevent caller-side mutations from
affecting the task.
internal/service/scheduler/scheduler.go (1)

116-119: Prefer an interface-bound DAGStore dependency here.

Snapshot enrichment now depends on er being *entryReaderImpl. With any other EntryReader, external sub-DAG resolution disappears and scheduler-built snapshots can become incomplete. Passing exec.DAGStore into newScheduler directly, or exposing it on the interface, would avoid that hidden coupling.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/scheduler/scheduler.go` around lines 116 - 119, The current
code casts er to *entryReaderImpl to grab impl.dagStore which hides a dependency
and breaks snapshot enrichment for other EntryReader implementations; change
newScheduler to accept an exec.DAGStore parameter (or add a DAGStore()
exec.DAGStore method to the EntryReader interface) and use that explicit
DAGStore inside newScheduler/scheduler construction instead of the type
assertion, updating call sites that construct the scheduler to pass the DAGStore
through.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/agent/snapshot.go`:
- Around line 422-427: The code silently ignores errors from the resolve
function in the r.walk path, causing BuildSnapshotForDAG to miss nested agent
steps; change r.walk (and its callers, including BuildSnapshotForDAG) to return
an error and, where resolve != nil, if resolve(ctx, target) returns a non-nil
err then return that error instead of skipping the branch; keep the existing
behavior only when resolve == nil (no-op), but ensure all call sites propagate
the returned error up so sub-DAG resolution failures surface during snapshot
planning.
- Around line 207-215: The snapshot currently uses reqs.sortedDAGNames() when
needsMemory is set by step.Agent.Memory.Enabled, which pulls memory from all
visited DAGs; instead track DAGs that opted into agent memory separately (e.g.,
build a reqs.memoryDAGNames or reqs.sortedMemoryDAGNames() collection while
evaluating step.Agent.Memory.Enabled) and pass that collection into
snapshotMemory(ctx, stores.MemoryStore, reqs.sortedMemoryDAGNames()) so
snapshot.Memory only contains DAG-scoped memory that explicitly enabled agent
memory; apply the same change wherever snapshotMemory or memory population is
invoked (the other occurrences referenced around the later blocks) and ensure
needsMemory remains driven by step.Agent.Memory.Enabled but uses the new
memory-specific name list for extraction and snapshot.Memory assignment.
- Around line 258-268: The helpers snapshotSkills and snapshotSouls currently
treat ErrSkillNotFound/ErrSoulNotFound as success and skip missing entries;
change them to fail-fast by returning an error when store.GetByID returns
ErrSkillNotFound or ErrSoulNotFound instead of continuing. Update the
switch/default branches in snapshotSkills (handling SkillStore.GetByID and
ErrSkillNotFound) and the analogous logic in snapshotSouls (SoulStore.GetByID
and ErrSoulNotFound) to return a descriptive wrapped error (e.g.,
fmt.Errorf("load skill %q for snapshot: %w", id, ErrSkillNotFound)) so the
caller sees missing references rather than producing an incomplete snapshot.
- Around line 106-117: The gzip decode path currently uses io.ReadAll(zr) which
allows decompressed data to grow unbounded; replace that with a bounded read
using an io.LimitedReader (or io.LimitReader) and a package-level constant like
maxUncompressedSnapshotSize to cap the decompressed size in the function that
opens the gzip reader (the block using gzip.NewReader and variable zr). Read up
to maxSize+1 bytes so you can detect overflow, and if the read returns more than
maxUncompressedSnapshotSize, return a clear error (e.g., "agent snapshot too
large"); ensure zr is still closed in the defer and propagate other read errors
as before.

In `@internal/agent/system_prompt.txt`:
- Around line 59-64: The template emits <memory_paths> even when Memory.ReadOnly
is true, which contradicts the read-only contract; update
internal/agent/system_prompt.txt to render the <memory_paths> section only when
MemoryDir is set AND Memory.ReadOnly is false (i.e., wrap the <memory_paths>
block in a conditional that checks !Memory.ReadOnly), leaving the existing
<memory_mode> block for the read-only notice intact so the prompt advertises no
writable memory targets during snapshot-backed runs.

In `@internal/service/scheduler/dag_executor.go`:
- Around line 255-262: The buildSnapshotBuilder function currently returns nil
when dagStore is nil, disabling snapshot creation; instead always return the
closure that calls agentsnapshot.BuildFromPaths so BuildFromPaths can handle a
nil dagStore itself—update buildSnapshotBuilder (the returned anonymous func) to
be returned unconditionally and simply call agentsnapshot.BuildFromPaths(ctx,
dag, paths, dagStore) even when dagStore is nil.

---

Nitpick comments:
In `@internal/runtime/executor/task.go`:
- Around line 116-121: WithAgentSnapshot currently assigns the incoming []byte
directly to coordinatorv1.Task.AgentSnapshot which can leak mutations; change
WithAgentSnapshot to defensively copy the snapshot (handle nil by setting nil)
before assigning so the Task holds its own byte slice (e.g., allocate a new
[]byte of len(snapshot) and copy into it) to prevent caller-side mutations from
affecting the task.

In `@internal/service/scheduler/scheduler.go`:
- Around line 116-119: The current code casts er to *entryReaderImpl to grab
impl.dagStore which hides a dependency and breaks snapshot enrichment for other
EntryReader implementations; change newScheduler to accept an exec.DAGStore
parameter (or add a DAGStore() exec.DAGStore method to the EntryReader
interface) and use that explicit DAGStore inside newScheduler/scheduler
construction instead of the type assertion, updating call sites that construct
the scheduler to pass the DAGStore through.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ad64e02c-b760-41af-8cca-fb7aee116c8a

📥 Commits

Reviewing files that changed from the base of the PR and between 3b4c426 and 001d080.

⛔ Files ignored due to path filters (3)
  • proto/coordinator/v1/coordinator.pb.go is excluded by !**/*.pb.go
  • proto/coordinator/v1/coordinator_protoopaque.pb.go is excluded by !**/*.pb.go
  • proto/index/v1/index.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (26)
  • internal/agent/model_config.go
  • internal/agent/snapshot.go
  • internal/agent/snapshot_stores.go
  • internal/agent/snapshot_stores_test.go
  • internal/agent/snapshot_test.go
  • internal/agent/system_prompt.txt
  • internal/agent/system_prompt_test.go
  • internal/agentsnapshot/dispatch.go
  • internal/agentsnapshot/dispatch_test.go
  • internal/intg/queue/queue_test.go
  • internal/runtime/builtin/agentstep/executor.go
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/executor/task.go
  • internal/runtime/executor/task_test.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/dags.go
  • internal/service/frontend/api/v1/dags_test.go
  • internal/service/scheduler/dag_executor.go
  • internal/service/scheduler/dag_executor_test.go
  • internal/service/scheduler/queue_processor_startup_test.go
  • internal/service/scheduler/queue_processor_test.go
  • internal/service/scheduler/scheduler.go
  • internal/service/worker/remote_handler.go
  • internal/service/worker/remote_handler_test.go
  • internal/test/reschedule_inline_helpers.go
  • proto/coordinator/v1/coordinator.proto

Comment thread internal/agent/snapshot.go
Comment thread internal/agent/snapshot.go
Comment thread internal/agent/snapshot.go
Comment thread internal/agent/snapshot.go
Comment thread internal/agent/system_prompt.txt
Comment thread internal/service/scheduler/dag_executor.go
@yottahmd yottahmd merged commit 8b42bba into main Apr 10, 2026
3 checks passed
@yottahmd yottahmd deleted the feat/agent-worker-snapshot branch April 10, 2026 16:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant