fix: reduce start-all CPU usage and event-driven SSE#2031
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds a DAG-run "updated" event type and emission, extracts lease identity checks into LeaseIdentityMatchesStatus, refactors coordinator zombie detection into durable lease reconciliation with tighter identity/worker-ID handling, switches SSE run-related topics to on-demand eventstore-driven refresh, and updates tests accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant Coordinator as Coordinator
participant LeaseStore as LeaseStore
participant StatusStore as StatusStore
participant EventStore as EventStore
participant Frontend as Frontend SSE
Coordinator->>LeaseStore: List leases
loop per lease
Coordinator->>StatusStore: Resolve latest attempt/status
alt unresolved or corrupted
Coordinator->>LeaseStore: Delete lease
Coordinator->>StatusStore: Delete distributed tracking
else resolved
Coordinator->>Coordinator: LeaseIdentityMatchesStatus?
alt identity matches AND status fresh (Running/Queued/NotStarted)
Coordinator->>StatusStore: Upsert active distributed tracking (coordinator-owned)
else identity mismatch OR stale
Coordinator->>StatusStore: Mark attempt Failed
Coordinator->>EventStore: Emit lifecycle and/or update events
Coordinator->>LeaseStore: Delete lease
end
end
end
EventStore->>Frontend: Invalidate run-related topics
Frontend->>StatusStore: Fetch on-demand (TopicRefreshModeOnDemand)
Estimated Code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/service/eventstore/eventstore.go (1)
214-241:⚠️ Potential issue | 🟡 MinorRetention is already configured; clarify the deduplication mechanism.
The concern about unique
TypeDAGRunUpdatedIDs is valid: becauseDAGRunUpdateEventIDincludesrecordedAt(RFC3339Nano), each update event receives a distinct ID. However, the mechanism atinternal/persis/fileeventstore/notifications.go:297-312(selectNewestDAGRunEvent) doesn't deduplicate these events—it deduplicates by ID, so events with unique IDs all persist.That said, retention is already addressed:
EventStoreConfig.RetentionDaysdefaults to 1 day (seeinternal/cmn/config/config.go:222), andcleanupExpired()removes committed event log files older than this cutoff uniformly across all event types. Long-running DAGs with frequent same-status updates will accumulate distinctdag.run.updatedevents up to the retention boundary, then be pruned.No evidence of cursor-based read starvation in
ReadDAGRunEvents; it returns all available events without pagination limits. If concerned about burst behavior, consider documenting the design assumption that update events are transient within the retention window.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/eventstore/eventstore.go` around lines 214 - 241, The bug is that DAGRunUpdateEventID includes recordedAt (RFC3339Nano) so every update gets a unique ID and cannot be deduplicated by the existing selectNewestDAGRunEvent (which dedupes by ID); change DAGRunUpdateEventID to drop recordedAt (so it uses the same stableID inputs as non-update DAG run events) and ensure NewDAGRunEvent still switches to DAGRunUpdateEventID when eventType == TypeDAGRunUpdated so subsequent status-update events produce the same ID and can be deduplicated by selectNewestDAGRunEvent.
🧹 Nitpick comments (1)
internal/service/coordinator/handler.go (1)
2141-2213: State machine reads cleanly; one optional consistency nit.The reconciliation covers the expected cases (missing attempt, nil status, superseded identity, fresh vs. stale per-status branches, terminal fall-through to delete tracking).
Minor consistency suggestion: the error switch at Line 2151-2167 only maps
ErrDAGRunIDNotFoundandErrNoStatusDatato the "delete tracking" path.createAttemptForTask(Line 419) additionally treatsexec.ErrCorruptedStatusFileas equivalent to "no longer exists". If a status file becomes corrupted, the current code will log on every tick and leave the lease record indefinitely. Consider addingerrors.Is(err, exec.ErrCorruptedStatusFile)to the missing-run branch for parity.♻️ Optional diff
attempt, runStatus, err := h.resolveLatestAttempt(ctx, lease.DAGRun.Name, lease.DAGRun.ID, lease.Root) switch { case err == nil: - case errors.Is(err, exec.ErrDAGRunIDNotFound), errors.Is(err, exec.ErrNoStatusData): + case errors.Is(err, exec.ErrDAGRunIDNotFound), + errors.Is(err, exec.ErrNoStatusData), + errors.Is(err, exec.ErrCorruptedStatusFile): h.deleteDistributedTracking(ctx, context.WithoutCancel(ctx), lease.DAGRun, lease.AttemptKey, "Failed to delete distributed lease for missing leased run", "Failed to delete active distributed run for missing leased run", ) return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/coordinator/handler.go` around lines 2141 - 2213, The error handling in reconcileDistributedLease's switch on err should treat exec.ErrCorruptedStatusFile the same as exec.ErrDAGRunIDNotFound and exec.ErrNoStatusData so corrupted status files trigger deleteDistributedTracking instead of repeated errors; update the switch in reconcileDistributedLease to include errors.Is(err, exec.ErrCorruptedStatusFile) in the missing-run branch (parity with createAttemptForTask which already treats exec.ErrCorruptedStatusFile as "no longer exists"), so the code calls h.deleteDistributedTracking(...) for corrupted status errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@internal/service/eventstore/eventstore.go`:
- Around line 214-241: The bug is that DAGRunUpdateEventID includes recordedAt
(RFC3339Nano) so every update gets a unique ID and cannot be deduplicated by the
existing selectNewestDAGRunEvent (which dedupes by ID); change
DAGRunUpdateEventID to drop recordedAt (so it uses the same stableID inputs as
non-update DAG run events) and ensure NewDAGRunEvent still switches to
DAGRunUpdateEventID when eventType == TypeDAGRunUpdated so subsequent
status-update events produce the same ID and can be deduplicated by
selectNewestDAGRunEvent.
---
Nitpick comments:
In `@internal/service/coordinator/handler.go`:
- Around line 2141-2213: The error handling in reconcileDistributedLease's
switch on err should treat exec.ErrCorruptedStatusFile the same as
exec.ErrDAGRunIDNotFound and exec.ErrNoStatusData so corrupted status files
trigger deleteDistributedTracking instead of repeated errors; update the switch
in reconcileDistributedLease to include errors.Is(err,
exec.ErrCorruptedStatusFile) in the missing-run branch (parity with
createAttemptForTask which already treats exec.ErrCorruptedStatusFile as "no
longer exists"), so the code calls h.deleteDistributedTracking(...) for
corrupted status errors.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2cbed889-d19d-4c0e-82be-305bac082e7a
📒 Files selected for processing (10)
internal/core/exec/distributed.gointernal/persis/filedagrun/attempt_test.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.gointernal/service/eventstore/context.gointernal/service/eventstore/eventstore.gointernal/service/eventstore/eventstore_test.gointernal/service/eventstore/notifications.gointernal/service/frontend/server.gointernal/service/frontend/server_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/service/coordinator/handler.go (1)
2197-2223: Optional: collapse the identicalRunning/NotStartedandQueuedswitch arms.Both branches have byte-identical bodies; merging them removes duplication without losing the terminal/default handling.
♻️ Proposed refactor
switch runStatus.Status { - case core.Running, core.NotStarted: - if lease.IsFresh(now, h.staleLeaseThreshold) { - h.upsertActiveDistributedRun(ctx, runStatus, workerID, attemptID) - return - } - case core.Queued: + case core.Running, core.NotStarted, core.Queued: if lease.IsFresh(now, h.staleLeaseThreshold) { h.upsertActiveDistributedRun(ctx, runStatus, workerID, attemptID) return } case core.Failed, core.Aborted, core.Succeeded, core.PartiallySucceeded, core.Waiting, core.Rejected:Feel free to ignore if you intend to diverge the
Queuedhandling later (e.g., separate staleness threshold for queue dispatch).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/coordinator/handler.go` around lines 2197 - 2223, The switch on runStatus.Status duplicates the same body for core.Running, core.NotStarted and core.Queued; collapse those cases into a single case list (case core.Running, core.NotStarted, core.Queued:) so they all call lease.IsFresh(now, h.staleLeaseThreshold) and h.upsertActiveDistributedRun(ctx, runStatus, workerID, attemptID) identically, leaving the existing failure/default branches (h.deleteDistributedTracking(...)) and the final h.markStatusLeaseRunFailed(...) untouched; update only the switch labels around runStatus.Status and keep references to lease.IsFresh, h.staleLeaseThreshold, h.upsertActiveDistributedRun, h.deleteDistributedTracking, h.markStatusLeaseRunFailed and staleDistributedLeaseReason.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/service/coordinator/handler.go`:
- Around line 2197-2223: The switch on runStatus.Status duplicates the same body
for core.Running, core.NotStarted and core.Queued; collapse those cases into a
single case list (case core.Running, core.NotStarted, core.Queued:) so they all
call lease.IsFresh(now, h.staleLeaseThreshold) and
h.upsertActiveDistributedRun(ctx, runStatus, workerID, attemptID) identically,
leaving the existing failure/default branches (h.deleteDistributedTracking(...))
and the final h.markStatusLeaseRunFailed(...) untouched; update only the switch
labels around runStatus.Status and keep references to lease.IsFresh,
h.staleLeaseThreshold, h.upsertActiveDistributedRun,
h.deleteDistributedTracking, h.markStatusLeaseRunFailed and
staleDistributedLeaseReason.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c5e78543-fda5-4a84-8b90-3b51a2ecb621
📒 Files selected for processing (9)
internal/cmn/backoff/retry_test.gointernal/persis/fileeventstore/collector.gointernal/persis/fileeventstore/collector_test.gointernal/service/chatbridge/monitor_state_test.gointernal/service/chatbridge/notifications.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.gointernal/service/eventstore/eventstore.gointernal/service/eventstore/eventstore_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/service/eventstore/eventstore.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Summary
Root Cause
start-allstarted the coordinator zombie detector by backfilling active distributed runs through a history-wideListStatuses(... WithAllHistory, WithoutLimit)scan. On large local stores this made startup CPU scale with all historical DAG runs instead of active distributed attempts.Validation
go test ./internal/service/eventstore ./internal/persis/fileeventstore ./internal/persis/filedagrungo test ./internal/core/exec ./internal/service/coordinatorgo test ./internal/service/frontend ./internal/service/frontend/sse ./internal/service/frontend/api/v1make bindagu start-all --cpu-profileagainst~/.daguafter the fix: coordinator backfill no longer appears in the profile;filedagrun.(*Store).ListStatusesdropped to a 10ms scheduler retry-scanner sample.Summary by CodeRabbit
New Features
Improvements
Tests