Skip to content

fix: reduce start-all CPU usage and event-driven SSE#2031

Merged
yottahmd merged 8 commits intomainfrom
fix/start-all-cpu-sse-eventstore
Apr 23, 2026
Merged

fix: reduce start-all CPU usage and event-driven SSE#2031
yottahmd merged 8 commits intomainfrom
fix/start-all-cpu-sse-eventstore

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 23, 2026

Summary

  • replace coordinator startup historical DAG-run scans with lease and active-index reconciliation
  • switch run-driven SSE topics to event-store-driven on-demand refresh
  • emit non-notification DAG-run update events so same-status progress still wakes browser subscribers

Root Cause

start-all started the coordinator zombie detector by backfilling active distributed runs through a history-wide ListStatuses(... WithAllHistory, WithoutLimit) scan. On large local stores this made startup CPU scale with all historical DAG runs instead of active distributed attempts.

Validation

  • go test ./internal/service/eventstore ./internal/persis/fileeventstore ./internal/persis/filedagrun
  • go test ./internal/core/exec ./internal/service/coordinator
  • go test ./internal/service/frontend ./internal/service/frontend/sse ./internal/service/frontend/api/v1
  • make bin
  • CPU profile of dagu start-all --cpu-profile against ~/.dagu after the fix: coordinator backfill no longer appears in the profile; filedagrun.(*Store).ListStatuses dropped to a 10ms scheduler retry-scanner sample.

Summary by CodeRabbit

  • New Features

    • Added a DAG-run "updated" event type and deterministic update event IDs.
  • Improvements

    • Durable stale-lease reconciliation with stricter lease identity/freshness handling and faster recovery.
    • SSE run-related topics use on-demand invalidation; more topics publish on wake.
    • Collector ignores non-inbox temp files; task claims upsert active run tracking immediately.
    • Notification filtering now excludes DAG-run update events.
  • Tests

    • Expanded coverage for events, lease handling, SSE, collector edge cases, and notifications.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 23, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8670a036-de91-4d77-bf54-19d8e47aa3ea

📥 Commits

Reviewing files that changed from the base of the PR and between 9c93edb and c4c4ea0.

📒 Files selected for processing (1)
  • .github/workflows/ci.yaml

📝 Walkthrough

Walkthrough

Adds a DAG-run "updated" event type and emission, extracts lease identity checks into LeaseIdentityMatchesStatus, refactors coordinator zombie detection into durable lease reconciliation with tighter identity/worker-ID handling, switches SSE run-related topics to on-demand eventstore-driven refresh, and updates tests accordingly.

Changes

Cohort / File(s) Summary
Lease identity & freshness
internal/core/exec/distributed.go
Added exported LeaseIdentityMatchesStatus(lease *DAGRunLease, status *DAGRunStatus, fallbackAttemptID string) bool; moved identity comparison out of freshness checks so LeaseMatchesStatus delegates identity validation to the new helper.
Stale-lease reconciliation / coordinator
internal/service/coordinator/handler.go, internal/service/coordinator/handler_test.go
Reworked zombie detection into durable lease reconciliation: list leases, resolve latest attempt/status, delete or upsert distributed tracking based on resolution and identity, derive remote WorkerID via helper, immediately upsert active tracking on AckTaskClaim; tests updated to assert reduced remote status scans and new behaviors.
Eventstore: update event & IDs
internal/service/eventstore/eventstore.go, internal/service/eventstore/context.go, internal/service/eventstore/notifications.go, internal/service/eventstore/eventstore_test.go
Introduced TypeDAGRunUpdated and DAGRunUpdateEventID; NewDAGRunEvent records recordedAt and uses update-specific ID/OccurredAt; EmitPersistedStatusTransitionFromContext always emits mapped lifecycle or update event and returns lifecycle type; notifications classification updated; tests added.
Lifecycle & persistence tests
internal/persis/filedagrun/attempt_test.go
Tests updated to expect additional TypeDAGRunUpdated events interleaved with lifecycle transitions and adjusted reopen/restore expectations.
SSE & frontend
internal/service/frontend/server.go, internal/service/frontend/server_test.go
When event service exists, run-related SSE topics (DAGRun, SubDAGRun, DAGHistory, DAGRuns, Queues, DAGsList) use TopicRefreshModeOnDemand; "publish on wake" extended to DAGsList and Queues; tests updated to validate eventstore-driven invalidation and single-fetch behavior.
File event collector robustness
internal/persis/fileeventstore/collector.go, internal/persis/fileeventstore/collector_test.go
Collector.DrainOnce ignores non-inbox-suffix entries (e.g., atomic/temp files); new test ensures temp files are ignored (not quarantined) while valid events are processed.
Retry test synchronization
internal/cmn/backoff/retry_test.go
Test deterministically cancels during backoff using signalingRetryPolicy; assertions and elapsed-time bounds updated.
Chatbridge notifications & tests
internal/service/chatbridge/notifications.go, internal/service/chatbridge/monitor_state_test.go
Treat TypeDAGRunUpdated as unknown/non-notification (not enqueued); tests use OS-aware eventual timeouts.
CI workflow
.github/workflows/ci.yaml
Adds an integration-only Docker image warm-up/pull stage for the intg suite with retry logic before resolving test packages.

Sequence Diagram(s)

sequenceDiagram
    participant Coordinator as Coordinator
    participant LeaseStore as LeaseStore
    participant StatusStore as StatusStore
    participant EventStore as EventStore
    participant Frontend as Frontend SSE

    Coordinator->>LeaseStore: List leases
    loop per lease
        Coordinator->>StatusStore: Resolve latest attempt/status
        alt unresolved or corrupted
            Coordinator->>LeaseStore: Delete lease
            Coordinator->>StatusStore: Delete distributed tracking
        else resolved
            Coordinator->>Coordinator: LeaseIdentityMatchesStatus?
            alt identity matches AND status fresh (Running/Queued/NotStarted)
                Coordinator->>StatusStore: Upsert active distributed tracking (coordinator-owned)
            else identity mismatch OR stale
                Coordinator->>StatusStore: Mark attempt Failed
                Coordinator->>EventStore: Emit lifecycle and/or update events
                Coordinator->>LeaseStore: Delete lease
            end
        end
    end
    EventStore->>Frontend: Invalidate run-related topics
    Frontend->>StatusStore: Fetch on-demand (TopicRefreshModeOnDemand)
Loading

Estimated Code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 13.79% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the two main objectives: reducing CPU usage during start-all and implementing event-driven SSE refresh.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/start-all-cpu-sse-eventstore

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@yottahmd yottahmd changed the title fix: reduce start-all CPU usage and event-drive SSE fix: reduce start-all CPU usage and event-driven SSE Apr 23, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/service/eventstore/eventstore.go (1)

214-241: ⚠️ Potential issue | 🟡 Minor

Retention is already configured; clarify the deduplication mechanism.

The concern about unique TypeDAGRunUpdated IDs is valid: because DAGRunUpdateEventID includes recordedAt (RFC3339Nano), each update event receives a distinct ID. However, the mechanism at internal/persis/fileeventstore/notifications.go:297-312 (selectNewestDAGRunEvent) doesn't deduplicate these events—it deduplicates by ID, so events with unique IDs all persist.

That said, retention is already addressed: EventStoreConfig.RetentionDays defaults to 1 day (see internal/cmn/config/config.go:222), and cleanupExpired() removes committed event log files older than this cutoff uniformly across all event types. Long-running DAGs with frequent same-status updates will accumulate distinct dag.run.updated events up to the retention boundary, then be pruned.

No evidence of cursor-based read starvation in ReadDAGRunEvents; it returns all available events without pagination limits. If concerned about burst behavior, consider documenting the design assumption that update events are transient within the retention window.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/eventstore/eventstore.go` around lines 214 - 241, The bug is
that DAGRunUpdateEventID includes recordedAt (RFC3339Nano) so every update gets
a unique ID and cannot be deduplicated by the existing selectNewestDAGRunEvent
(which dedupes by ID); change DAGRunUpdateEventID to drop recordedAt (so it uses
the same stableID inputs as non-update DAG run events) and ensure NewDAGRunEvent
still switches to DAGRunUpdateEventID when eventType == TypeDAGRunUpdated so
subsequent status-update events produce the same ID and can be deduplicated by
selectNewestDAGRunEvent.
🧹 Nitpick comments (1)
internal/service/coordinator/handler.go (1)

2141-2213: State machine reads cleanly; one optional consistency nit.

The reconciliation covers the expected cases (missing attempt, nil status, superseded identity, fresh vs. stale per-status branches, terminal fall-through to delete tracking).

Minor consistency suggestion: the error switch at Line 2151-2167 only maps ErrDAGRunIDNotFound and ErrNoStatusData to the "delete tracking" path. createAttemptForTask (Line 419) additionally treats exec.ErrCorruptedStatusFile as equivalent to "no longer exists". If a status file becomes corrupted, the current code will log on every tick and leave the lease record indefinitely. Consider adding errors.Is(err, exec.ErrCorruptedStatusFile) to the missing-run branch for parity.

♻️ Optional diff
 	attempt, runStatus, err := h.resolveLatestAttempt(ctx, lease.DAGRun.Name, lease.DAGRun.ID, lease.Root)
 	switch {
 	case err == nil:
-	case errors.Is(err, exec.ErrDAGRunIDNotFound), errors.Is(err, exec.ErrNoStatusData):
+	case errors.Is(err, exec.ErrDAGRunIDNotFound),
+		errors.Is(err, exec.ErrNoStatusData),
+		errors.Is(err, exec.ErrCorruptedStatusFile):
 		h.deleteDistributedTracking(ctx, context.WithoutCancel(ctx), lease.DAGRun, lease.AttemptKey,
 			"Failed to delete distributed lease for missing leased run",
 			"Failed to delete active distributed run for missing leased run",
 		)
 		return
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/coordinator/handler.go` around lines 2141 - 2213, The error
handling in reconcileDistributedLease's switch on err should treat
exec.ErrCorruptedStatusFile the same as exec.ErrDAGRunIDNotFound and
exec.ErrNoStatusData so corrupted status files trigger deleteDistributedTracking
instead of repeated errors; update the switch in reconcileDistributedLease to
include errors.Is(err, exec.ErrCorruptedStatusFile) in the missing-run branch
(parity with createAttemptForTask which already treats
exec.ErrCorruptedStatusFile as "no longer exists"), so the code calls
h.deleteDistributedTracking(...) for corrupted status errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@internal/service/eventstore/eventstore.go`:
- Around line 214-241: The bug is that DAGRunUpdateEventID includes recordedAt
(RFC3339Nano) so every update gets a unique ID and cannot be deduplicated by the
existing selectNewestDAGRunEvent (which dedupes by ID); change
DAGRunUpdateEventID to drop recordedAt (so it uses the same stableID inputs as
non-update DAG run events) and ensure NewDAGRunEvent still switches to
DAGRunUpdateEventID when eventType == TypeDAGRunUpdated so subsequent
status-update events produce the same ID and can be deduplicated by
selectNewestDAGRunEvent.

---

Nitpick comments:
In `@internal/service/coordinator/handler.go`:
- Around line 2141-2213: The error handling in reconcileDistributedLease's
switch on err should treat exec.ErrCorruptedStatusFile the same as
exec.ErrDAGRunIDNotFound and exec.ErrNoStatusData so corrupted status files
trigger deleteDistributedTracking instead of repeated errors; update the switch
in reconcileDistributedLease to include errors.Is(err,
exec.ErrCorruptedStatusFile) in the missing-run branch (parity with
createAttemptForTask which already treats exec.ErrCorruptedStatusFile as "no
longer exists"), so the code calls h.deleteDistributedTracking(...) for
corrupted status errors.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2cbed889-d19d-4c0e-82be-305bac082e7a

📥 Commits

Reviewing files that changed from the base of the PR and between 88a1907 and 27b9033.

📒 Files selected for processing (10)
  • internal/core/exec/distributed.go
  • internal/persis/filedagrun/attempt_test.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/eventstore/context.go
  • internal/service/eventstore/eventstore.go
  • internal/service/eventstore/eventstore_test.go
  • internal/service/eventstore/notifications.go
  • internal/service/frontend/server.go
  • internal/service/frontend/server_test.go

@yottahmd
Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 23, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
internal/service/coordinator/handler.go (1)

2197-2223: Optional: collapse the identical Running/NotStarted and Queued switch arms.

Both branches have byte-identical bodies; merging them removes duplication without losing the terminal/default handling.

♻️ Proposed refactor
 	switch runStatus.Status {
-	case core.Running, core.NotStarted:
-		if lease.IsFresh(now, h.staleLeaseThreshold) {
-			h.upsertActiveDistributedRun(ctx, runStatus, workerID, attemptID)
-			return
-		}
-	case core.Queued:
+	case core.Running, core.NotStarted, core.Queued:
 		if lease.IsFresh(now, h.staleLeaseThreshold) {
 			h.upsertActiveDistributedRun(ctx, runStatus, workerID, attemptID)
 			return
 		}
 	case core.Failed, core.Aborted, core.Succeeded, core.PartiallySucceeded, core.Waiting, core.Rejected:

Feel free to ignore if you intend to diverge the Queued handling later (e.g., separate staleness threshold for queue dispatch).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/coordinator/handler.go` around lines 2197 - 2223, The switch
on runStatus.Status duplicates the same body for core.Running, core.NotStarted
and core.Queued; collapse those cases into a single case list (case
core.Running, core.NotStarted, core.Queued:) so they all call lease.IsFresh(now,
h.staleLeaseThreshold) and h.upsertActiveDistributedRun(ctx, runStatus,
workerID, attemptID) identically, leaving the existing failure/default branches
(h.deleteDistributedTracking(...)) and the final h.markStatusLeaseRunFailed(...)
untouched; update only the switch labels around runStatus.Status and keep
references to lease.IsFresh, h.staleLeaseThreshold,
h.upsertActiveDistributedRun, h.deleteDistributedTracking,
h.markStatusLeaseRunFailed and staleDistributedLeaseReason.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/service/coordinator/handler.go`:
- Around line 2197-2223: The switch on runStatus.Status duplicates the same body
for core.Running, core.NotStarted and core.Queued; collapse those cases into a
single case list (case core.Running, core.NotStarted, core.Queued:) so they all
call lease.IsFresh(now, h.staleLeaseThreshold) and
h.upsertActiveDistributedRun(ctx, runStatus, workerID, attemptID) identically,
leaving the existing failure/default branches (h.deleteDistributedTracking(...))
and the final h.markStatusLeaseRunFailed(...) untouched; update only the switch
labels around runStatus.Status and keep references to lease.IsFresh,
h.staleLeaseThreshold, h.upsertActiveDistributedRun,
h.deleteDistributedTracking, h.markStatusLeaseRunFailed and
staleDistributedLeaseReason.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c5e78543-fda5-4a84-8b90-3b51a2ecb621

📥 Commits

Reviewing files that changed from the base of the PR and between 27b9033 and 06e3ca1.

📒 Files selected for processing (9)
  • internal/cmn/backoff/retry_test.go
  • internal/persis/fileeventstore/collector.go
  • internal/persis/fileeventstore/collector_test.go
  • internal/service/chatbridge/monitor_state_test.go
  • internal/service/chatbridge/notifications.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/eventstore/eventstore.go
  • internal/service/eventstore/eventstore_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/service/eventstore/eventstore.go

@yottahmd
Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 23, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 23, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@yottahmd yottahmd merged commit 6a09552 into main Apr 23, 2026
10 checks passed
@yottahmd yottahmd deleted the fix/start-all-cpu-sse-eventstore branch April 23, 2026 04:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant