Skip to content

feat: add centralized event store#1885

Merged
yottahmd merged 7 commits intomainfrom
eventfeed
Apr 1, 2026
Merged

feat: add centralized event store#1885
yottahmd merged 7 commits intomainfrom
eventfeed

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Mar 31, 2026

Summary

  • add a centralized file-backed event store with config wiring, collector support, and a new /event-logs API for querying recorded events
  • scope DAG-run events to waiting and terminal states, rotate committed event files hourly, and reduce the default event-store retention window to 3 days
  • remove the CLI events subcommand and add test coverage for event-store config, persistence, collector, and API query behavior

Testing

  • go test ./internal/service/eventstore ./internal/persis/fileeventstore ./internal/runtime/agent ./internal/cmd ./internal/service/scheduler ./internal/service/coordinator ./internal/service/frontend/api/v1 -count=1
  • go test ./internal/persis/fileeventstore -count=1
  • go test ./internal/cmn/config -count=1
  • go test ./cmd ./internal/cmd -count=1

Summary by CodeRabbit

  • New Features

    • Added centralized event log API endpoint with filtering and pagination to query operational events including DAG runs and LLM usage records.
    • Introduced file-based event storage with automatic retention cleanup.
  • Configuration

    • New event_store settings: enabled flag (default on) and retention_days (default 3 days).

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 31, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: acd4ced8-aa42-443d-8d43-b3d8dc0c0a00

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR introduces a centralized event logging system to Dagu. It adds a new API endpoint for querying events, file-backed event persistence, event service layer infrastructure, configuration for event store settings, and integration points to emit events from DAG execution and LLM interactions.

Changes

Cohort / File(s) Summary
API Specifications & Generated Code
api/v1/api.yaml, api/v1/api.gen.go
Added new events tag, GET /event-logs endpoint with query filters, and corresponding OpenAPI schemas (EventLogEntry, EventLogsResponse). Generated Go types, handler interfaces, and route bindings for the new endpoint.
Event Store Core
internal/persis/fileeventstore/store.go, internal/persis/fileeventstore/store_test.go
File-backed event store with Emit and Query methods. Handles JSON serialization to inbox, log file management, filtering, sorting, and pagination. Tests verify inbox writes, query filtering with malformed-file skipping, and legacy daily file compatibility.
Event Collector
internal/persis/fileeventstore/collector.go, internal/persis/fileeventstore/collector_test.go
Background collector that drains inbox files into hour-bucketed JSONL logs, deduplicates via seen-ID tracking, enforces retention policies, and quarantines malformed files. Tests validate draining, deduplication after restart, malformed-file handling, and retention cleanup.
Event Service Layer
internal/service/eventstore/eventstore.go, internal/service/eventstore/eventstore_test.go, internal/service/eventstore/context.go
In-process event model with Event struct and normalization/validation. Service wrapper with Emit and Query. Context helpers for attaching event metadata and emitting from persisted DAG run status.
Configuration
internal/cmn/config/config.go, internal/cmn/config/definition.go, internal/cmn/config/loader.go, internal/cmn/config/key_hints.go, internal/cmn/config/path.go, internal/cmn/config/loader_test.go, internal/cmn/config/path_test.go, internal/cmn/schema/config.schema.json
Added EventStore config with Enabled and RetentionDays fields, EventStoreDir path, environment variable bindings, and JSON schema definitions. Defaults: enabled, 3-day retention.
Frontend API Integration
internal/service/frontend/api/v1/api.go, internal/service/frontend/api/v1/events.go, internal/service/frontend/api/v1/events_test.go, internal/service/frontend/api/v1/dagruns.go, internal/service/frontend/server.go
Added ListEventLogs endpoint handler with authorization checks, filter validation, pagination bounds, and response mapping. Wired event service initialization and injection throughout. Updated DAG-run mutation handlers to use event-augmented context.
Service Integration
internal/service/coordinator/handler.go, internal/service/coordinator/service.go, internal/service/scheduler/scheduler.go
Added event service wiring to coordinator and scheduler. Wrapped contexts with event metadata to propagate event source information through service lifecycles.
Command & Startup
internal/cmd/context.go, internal/cmd/coord.go, internal/cmd/scheduler.go, internal/cmd/startall.go, internal/cmd/status_test.go
Added EventService and EventSourceInstance to command context. Added WithEventSource helper to bind event metadata. Updated coordinator/scheduler initialization to use event-source-bound contexts.
Core Execution & Agent
internal/core/exec/enqueue_retry.go, internal/persis/filedagrun/attempt.go, internal/agent/api.go
Added OnQueued callback to EnqueueRetryOptions. Added event emission on message persistence for LLM usage tracking. Extended agent API to accept and wire event service, with helper to extract session metadata.
Test Utilities
internal/test/helper.go
Added EventStoreDir initialization and YAML config output for test environments.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Server as Frontend Server
    participant EventService as Event Service
    participant FileStore as File Event Store
    participant Collector as Event Collector
    
    Note over Client,Collector: Event Emission Flow
    rect rgba(100, 200, 100, 0.5)
    DAGExecutor->>EventService: Emit(ctx, event)
    EventService->>EventService: Normalize() & Validate()
    EventService->>FileStore: Emit(ctx, event)
    FileStore->>FileStore: Assign RecordedAt & ID
    FileStore->>FileStore: Write to Inbox
    Note right of FileStore: Async inbox file
    end
    
    rect rgba(100, 150, 200, 0.5)
    Note over Client,Collector: Background Collection
    Collector->>Collector: DrainOnce() at interval
    Collector->>FileStore: Read inbox files
    Collector->>Collector: Parse & validate JSON
    Collector->>Collector: Deduplicate by ID
    Collector->>Collector: Group by hour
    Collector->>FileStore: Write hour-bucketed logs
    Collector->>Collector: Track seenIDs
    end
    
    rect rgba(200, 150, 100, 0.5)
    Note over Client,Collector: Query Flow
    Client->>Server: GET /event-logs?filters
    Server->>EventService: Query(ctx, filter)
    EventService->>FileStore: Query(ctx, filter)
    FileStore->>FileStore: List log files by time window
    FileStore->>FileStore: Parse & filter entries
    FileStore->>FileStore: Sort & paginate
    FileStore-->>EventService: QueryResult
    EventService-->>Server: QueryResult
    Server-->>Client: ListEventLogs200JSONResponse
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • feat: add DAG-level retry policy #1774: Modifies internal/core/exec/enqueue_retry.go to change callback signatures and options handling, directly overlapping with the OnQueued callback introduced in this PR for retry event emission.
🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.30% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title 'feat: add centralized event store' directly and clearly summarizes the main change—introducing a centralized event store system. The title is concise, follows conventional commit format, and accurately represents the primary feature being added.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch eventfeed

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

🧹 Nitpick comments (3)
internal/cmn/config/key_hints.go (1)

139-141: Consider adding the missing eventstore.enabled legacy key mapping.

For completeness with other eventstore.* legacy keys, consider adding a mapping for eventstore.enabled:

🔧 Proposed fix
 	"audit.retentiondays":      "audit.retention_days",
 	"eventstore":               "event_store",
+	"eventstore.enabled":       "event_store.enabled",
 	"eventstore.retentiondays": "event_store.retention_days",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/config/key_hints.go` around lines 139 - 141, The key mapping
table in key_hints.go is missing a legacy-to-current entry for the eventstore
enabled flag; add a mapping for the legacy key "eventstore.enabled" to the
canonical key "event_store.enabled" alongside the existing "eventstore" and
"eventstore.retentiondays" entries so lookups in the map that reference
eventstore enabled will resolve correctly (update the same mapping literal where
"eventstore" and "eventstore.retentiondays" are defined).
api/v1/api.yaml (1)

3500-3580: Consider extracting shared query params into components/parameters.

startTime, endTime, limit, and offset mirror existing audit-style filters; centralizing them would reduce drift risk across log-list APIs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/v1/api.yaml` around lines 3500 - 3580, Extract the repeated query
parameters startTime, endTime, limit, and offset into reusable parameter
definitions under components/parameters (e.g., add parameters: StartTime,
EndTime, Limit, Offset with the same schema/constraints), then replace the
inline definitions in this operation with $ref references (for example use $ref:
"#/components/parameters/StartTime" etc.); ensure any other audit/log-list
endpoints that currently duplicate these fields are updated to reference the new
components to avoid drift and keep names/types identical to the existing
audit-style filters.
internal/core/exec/enqueue_retry.go (1)

113-117: Consider documenting that OnQueued failure does not roll back the enqueue.

The callback is invoked after both status persistence and queue enqueue succeed. If OnQueued returns an error, the item remains in the queue and the status remains as Queued. This is likely intentional (the callback is for supplementary operations like event emission), but documenting this behavior in the field's comment would help future maintainers understand the semantics.

📝 Suggested documentation enhancement
 	// OnQueued is called after the queued status and queue item are both durably written.
-	// Errors from this callback are returned to the caller.
+	// Errors from this callback are returned to the caller but do not roll back the
+	// already-persisted queue item and status.
 	OnQueued func(*DAGRunStatus) error
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/exec/enqueue_retry.go` around lines 113 - 117, The OnQueued
callback (opts.OnQueued) is invoked after status persistence and queue enqueue
and, if it returns an error, the item remains queued and the status stays as
Queued; update the field/comment for OnQueued to explicitly document this
behavior and its semantics (i.e., that OnQueued is for supplementary actions
like event emission, runs post-enqueue, its returned error is propagated but
does not roll back the enqueue or status). Locate the OnQueued field/type
definition and add a short note mentioning when it is called, that failures are
returned to the caller but do not undo persistence/enqueue, and recommend any
alternative patterns (e.g., compensating actions) if needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/cmd/context.go`:
- Around line 215-229: The code currently calls fileeventstore.New
unconditionally when cfg.EventStore.Enabled is true, causing unnecessary disk
work even when the process is on the shared-nothing worker “fast path”; update
the guard so that before invoking fileeventstore.New you check for the
shared-nothing worker fast-path/early-return condition (the worker fast-path
check used elsewhere in this file) and skip event store initialization when that
condition is true, otherwise proceed to call fileeventstore.New and then
eventstore.New and eventstore.WithContext (keep references to baseCtx,
eventSourceInstance, eventSvc, eventSourceServiceForCommand(cmd.Name())
unchanged).

In `@internal/persis/fileeventstore/collector.go`:
- Around line 179-211: The appendGroup function can return on Flush/Sync
failures after partially writing some items, which causes duplicates on retry;
fix by tracking how many items were successfully written (e.g. increment a
writtenCount after each successful writer.Write and WriteByte), and on any error
(including Flush/Sync) still mark c.seenIDs for group[:writtenCount] and remove
their inbox files (using the existing item.path logic, skipping os.IsNotExist
errors); ensure the final-success path also processes all items, and reference
appendGroup, c.seenIDs, pendingInboxEvent.event.ID, pendingInboxEvent.path,
writer.Flush and f.Sync when making the change.
- Around line 264-293: The bufio.Scanner in loadSeenIDsFromFile (and likewise in
readCommittedEvents used by Query) uses the default 64KiB token limit which can
drop long lines; after creating each scanner (i.e., right after scanner :=
bufio.NewScanner(f)) call scanner.Buffer(make([]byte, 64*1024), 10*1024*1024) to
raise the maximum token size to 10 MiB (keeping a 64 KiB initial buffer) so long
event lines are read instead of causing scanner errors and skipping events/IDs.
- Around line 296-355: cleanupExpired currently deletes expired files but never
prunes the in-memory dedupe map c.seenIDs, so add logic to prune seen IDs when
files are removed: in cleanupExpired, after successfully removing a committed
file (when you have variable window/path) call a new helper like
c.removeSeenIDsForWindow(window) that locks c.seenIDs and deletes IDs associated
with that committed window (e.g., IDs with timestamps <= window.end or otherwise
mapped to that file); likewise after removing a quarantined file (using
info.ModTime()/modDay and path) call a helper (e.g.,
c.removeSeenIDsBefore(cutoff) or c.removeSeenIDsForQuarantine(path)) to remove
seen IDs older than cutoff; implement these helpers to safely lock and modify
c.seenIDs (use existing sync primitives on Collector) so the in-memory dedupe
set does not grow forever.

In `@internal/persis/fileeventstore/store.go`:
- Around line 70-80: The event's RecordedAt is being synthesized into recordedAt
for the filename but not written into the marshaled JSON; before calling
json.Marshal(event) update event.RecordedAt when it is zero (set
event.RecordedAt = time.Now().UTC()) so the payload matches the
filename/timestamp; make this change in the block around json.Marshal(event) /
recordedAt / name (references: event.RecordedAt, recordedAt,
json.Marshal(event), name, fileutil.WriteFileAtomic, inboxSuffix, s.inboxDir).
- Around line 174-213: readCommittedEvents uses bufio.NewScanner without
increasing the default 64KiB token limit which will fail on oversized JSONL
lines; update the scanner initialization in readCommittedEvents to call
scanner.Buffer with a larger max token size (e.g. several megabytes) and an
appropriately sized initial buffer to accommodate large JSON events, do the same
change for the scanner usage in the collector (collector.go) and the fileaudit
store (internal/persis/fileaudit/store.go) to prevent scanner.Err() from failing
on long lines; reference the scanner variable created by bufio.NewScanner and
ensure the new max size is a named constant so it’s reusable across
readCommittedEvents and the other scanner sites.

In `@internal/service/coordinator/handler.go`:
- Around line 191-196: The coordinator is not using event-enriched contexts for
all terminal status writes, so functions like
markPreparedAttemptDispatchFailed() call attempt.Write() with the plain ctx and
miss event metadata; update all coordinator-originated terminal status write
calls (e.g., markPreparedAttemptDispatchFailed, any other places that call
attempt.Write for terminal statuses, and ReportStatus) to use
h.eventContext(ctx) (via Handler.eventContext) instead of the raw ctx so
eventstore.WithContext attaches the Source metadata and those failures appear in
/event-logs.

In `@internal/service/eventstore/context.go`:
- Around line 48-57: The helper EmitPersistedStatusFromContext should not use
the caller's cancellable RPC/request context when emitting an already-persisted
event; build a non-cancellable context (e.g., context.Background() or
context.WithTimeout(context.Background(), shortTimeout)) and call service.Emit
with that instead of ctx so a client disconnect or deadline won't drop the
event; locate EmitPersistedStatusFromContext (which calls FromContext,
PersistedDAGRunEventTypeForStatus, NewDAGRunEvent and service.Emit) and replace
the emitted context with a safe background/timeout context.

In `@internal/service/eventstore/eventstore_test.go`:
- Around line 22-30: The test currently excludes core.Rejected from the
centralized event feed (ok: false), which hides user rejections shown by
internal/service/frontend/api/v1/dagruns.go (it marks runs Rejected and sets
FinishedAt); update the test table in eventstore_test.go so core.Rejected is
treated as a terminal event (ok: true) and set want to the appropriate terminal
event type used by the codebase (prefer TypeDAGRunRejected if you add that
constant, or fold it into an existing terminal type such as
TypeDAGRunFailed/TypeDAGRunAborted consistent with the production mapping),
ensuring the test's mapping mirrors how Rejected is emitted or folded in the
event-emission code.

In `@internal/service/eventstore/eventstore.go`:
- Around line 315-317: The stableID function currently concatenates parts with
"\x1f", which is not collision-safe; update stableID to perform collision-safe
framing by hashing incrementally (use sha256.New, Write each part prefixed by
its length or write a fixed-size length header using encoding/binary) instead of
strings.Join, so that sequences like ["a","b\x1fc"] and ["a\x1fb","c"] produce
different digests; ensure you still return hex.EncodeToString of the final sum
to preserve the API.

In `@internal/service/frontend/api/v1/api.go`:
- Around line 734-736: The current updateDAGRunStatus always wraps writes with
withEventContext which causes duplicate lifecycle events; change
updateDAGRunStatus (the shared mutator used by handlers in dagruns.go) to fetch
the current persisted DAGRun status via dagRunMgr (or use an existing read
method) and only call dagRunMgr.UpdateStatus with a context that includes
withEventContext when the persisted top-level status actually changes to a
different lifecycle state; otherwise perform the update without withEventContext
(or skip emitting) so node-level rewrites while status is Waiting or terminal do
not append duplicate waiting/succeeded/failed/aborted events.

In `@internal/service/frontend/api/v1/events.go`:
- Around line 81-130: The handler should defensively handle a nil QueryResult
and nil entries from eventService.Query: after calling a.eventService.Query(ctx,
filter) check if result == nil and treat it like an empty result (returning zero
entries) or return a well-formed Error; when iterating result.Entries skip any
nil element (if e == nil continue) before accessing fields like e.ID,
e.SchemaVersion, etc.; also guard the make([...], len(result.Entries))
allocation by using 0 when result is nil. Update the code paths around the
symbols result, result.Entries, and the loop over for _, e := range
result.Entries in the function that calls a.eventService.Query to avoid panics.

---

Nitpick comments:
In `@api/v1/api.yaml`:
- Around line 3500-3580: Extract the repeated query parameters startTime,
endTime, limit, and offset into reusable parameter definitions under
components/parameters (e.g., add parameters: StartTime, EndTime, Limit, Offset
with the same schema/constraints), then replace the inline definitions in this
operation with $ref references (for example use $ref:
"#/components/parameters/StartTime" etc.); ensure any other audit/log-list
endpoints that currently duplicate these fields are updated to reference the new
components to avoid drift and keep names/types identical to the existing
audit-style filters.

In `@internal/cmn/config/key_hints.go`:
- Around line 139-141: The key mapping table in key_hints.go is missing a
legacy-to-current entry for the eventstore enabled flag; add a mapping for the
legacy key "eventstore.enabled" to the canonical key "event_store.enabled"
alongside the existing "eventstore" and "eventstore.retentiondays" entries so
lookups in the map that reference eventstore enabled will resolve correctly
(update the same mapping literal where "eventstore" and
"eventstore.retentiondays" are defined).

In `@internal/core/exec/enqueue_retry.go`:
- Around line 113-117: The OnQueued callback (opts.OnQueued) is invoked after
status persistence and queue enqueue and, if it returns an error, the item
remains queued and the status stays as Queued; update the field/comment for
OnQueued to explicitly document this behavior and its semantics (i.e., that
OnQueued is for supplementary actions like event emission, runs post-enqueue,
its returned error is propagated but does not roll back the enqueue or status).
Locate the OnQueued field/type definition and add a short note mentioning when
it is called, that failures are returned to the caller but do not undo
persistence/enqueue, and recommend any alternative patterns (e.g., compensating
actions) if needed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e9054263-85e3-4a52-b3fb-b38801d24e61

📥 Commits

Reviewing files that changed from the base of the PR and between eaee029 and 6720648.

📒 Files selected for processing (34)
  • api/v1/api.gen.go
  • api/v1/api.yaml
  • internal/agent/api.go
  • internal/cmd/context.go
  • internal/cmd/coord.go
  • internal/cmd/scheduler.go
  • internal/cmd/startall.go
  • internal/cmd/status_test.go
  • internal/cmn/config/config.go
  • internal/cmn/config/definition.go
  • internal/cmn/config/key_hints.go
  • internal/cmn/config/loader.go
  • internal/cmn/config/loader_test.go
  • internal/cmn/config/path.go
  • internal/cmn/config/path_test.go
  • internal/cmn/schema/config.schema.json
  • internal/core/exec/enqueue_retry.go
  • internal/persis/filedagrun/attempt.go
  • internal/persis/fileeventstore/collector.go
  • internal/persis/fileeventstore/collector_test.go
  • internal/persis/fileeventstore/store.go
  • internal/persis/fileeventstore/store_test.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/service.go
  • internal/service/eventstore/context.go
  • internal/service/eventstore/eventstore.go
  • internal/service/eventstore/eventstore_test.go
  • internal/service/frontend/api/v1/api.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/events.go
  • internal/service/frontend/api/v1/events_test.go
  • internal/service/frontend/server.go
  • internal/service/scheduler/scheduler.go
  • internal/test/helper.go

Comment thread internal/cmd/context.go
Comment on lines +215 to +229
baseCtx := ctx
eventSourceInstance := eventstore.DefaultSourceInstance()
var eventSvc *eventstore.Service
if cfg.EventStore.Enabled {
store, eventErr := fileeventstore.New(cfg.Paths.EventStoreDir)
if eventErr != nil {
logger.Warn(ctx, "Failed to initialize event store; continuing without event persistence", tag.Error(eventErr))
} else {
eventSvc = eventstore.New(store)
ctx = eventstore.WithContext(ctx, eventSvc, eventstore.Source{
Service: eventSourceServiceForCommand(cmd.Name()),
Instance: eventSourceInstance,
})
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Skip event-store setup on the shared-nothing worker fast path.

This block still instantiates the file-backed store before the worker early return and then throws it away. That adds unnecessary filesystem work and can generate startup warnings on workers that are explicitly supposed to avoid local storage.

♻️ Minimal fix
 	baseCtx := ctx
 	eventSourceInstance := eventstore.DefaultSourceInstance()
 	var eventSvc *eventstore.Service
-	if cfg.EventStore.Enabled {
+	sharedNothingWorker := isSharedNothingWorker(cmd, cfg)
+	if !sharedNothingWorker && cfg.EventStore.Enabled {
 		store, eventErr := fileeventstore.New(cfg.Paths.EventStoreDir)
 		if eventErr != nil {
 			logger.Warn(ctx, "Failed to initialize event store; continuing without event persistence", tag.Error(eventErr))
 		} else {
 			eventSvc = eventstore.New(store)
@@
-	if isSharedNothingWorker(cmd, cfg) {
+	if sharedNothingWorker {
 		logger.Debug(ctx, "Shared-nothing worker mode: skipping file-based stores",
 			slog.Any("coordinators", cfg.Worker.Coordinators),
 		)

Also applies to: 237-245

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmd/context.go` around lines 215 - 229, The code currently calls
fileeventstore.New unconditionally when cfg.EventStore.Enabled is true, causing
unnecessary disk work even when the process is on the shared-nothing worker
“fast path”; update the guard so that before invoking fileeventstore.New you
check for the shared-nothing worker fast-path/early-return condition (the worker
fast-path check used elsewhere in this file) and skip event store initialization
when that condition is true, otherwise proceed to call fileeventstore.New and
then eventstore.New and eventstore.WithContext (keep references to baseCtx,
eventSourceInstance, eventSvc, eventSourceServiceForCommand(cmd.Name())
unchanged).

Comment on lines +179 to +211
func (c *Collector) appendGroup(hour string, group []pendingInboxEvent) error {
logPath := filepath.Join(c.store.baseDir, logPrefix+hour+logSuffix)
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, filePermissions) //nolint:gosec // controlled path
if err != nil {
return fmt.Errorf("open event log %s: %w", logPath, err)
}
defer func() { _ = f.Close() }()

writer := bufio.NewWriter(f)
for _, item := range group {
if _, err := writer.Write(item.raw); err != nil {
return fmt.Errorf("append event log %s: %w", logPath, err)
}
if err := writer.WriteByte('\n'); err != nil {
return fmt.Errorf("append newline %s: %w", logPath, err)
}
}
if err := writer.Flush(); err != nil {
return fmt.Errorf("flush event log %s: %w", logPath, err)
}
if err := f.Sync(); err != nil {
return fmt.Errorf("sync event log %s: %w", logPath, err)
}

for _, item := range group {
c.seenIDs[item.event.ID] = struct{}{}
if err := os.Remove(item.path); err != nil && !os.IsNotExist(err) {
slog.Warn("fileeventstore: failed to delete processed inbox file",
slog.String("file", item.path),
slog.String("error", err.Error()))
}
}
return nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make append retries idempotent after partial write failures.

If Flush or Sync fails after some lines already reached disk, this returns before updating seenIDs or deleting inbox files. The next drain re-reads the same inbox files and can append duplicates for events that were already persisted.

🛠️ Possible mitigation
 func (c *Collector) appendGroup(hour string, group []pendingInboxEvent) error {
 	logPath := filepath.Join(c.store.baseDir, logPrefix+hour+logSuffix)
+	reloadSeenIDs := func() {
+		if err := c.loadSeenIDsFromFile(logPath); err != nil {
+			slog.Warn("fileeventstore: failed to reload seen-set after append error",
+				slog.String("file", logPath),
+				slog.String("error", err.Error()))
+		}
+	}
 	f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, filePermissions) //nolint:gosec // controlled path
 	if err != nil {
 		return fmt.Errorf("open event log %s: %w", logPath, err)
 	}
@@
 	if err := writer.Flush(); err != nil {
+		reloadSeenIDs()
 		return fmt.Errorf("flush event log %s: %w", logPath, err)
 	}
 	if err := f.Sync(); err != nil {
+		reloadSeenIDs()
 		return fmt.Errorf("sync event log %s: %w", logPath, err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *Collector) appendGroup(hour string, group []pendingInboxEvent) error {
logPath := filepath.Join(c.store.baseDir, logPrefix+hour+logSuffix)
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, filePermissions) //nolint:gosec // controlled path
if err != nil {
return fmt.Errorf("open event log %s: %w", logPath, err)
}
defer func() { _ = f.Close() }()
writer := bufio.NewWriter(f)
for _, item := range group {
if _, err := writer.Write(item.raw); err != nil {
return fmt.Errorf("append event log %s: %w", logPath, err)
}
if err := writer.WriteByte('\n'); err != nil {
return fmt.Errorf("append newline %s: %w", logPath, err)
}
}
if err := writer.Flush(); err != nil {
return fmt.Errorf("flush event log %s: %w", logPath, err)
}
if err := f.Sync(); err != nil {
return fmt.Errorf("sync event log %s: %w", logPath, err)
}
for _, item := range group {
c.seenIDs[item.event.ID] = struct{}{}
if err := os.Remove(item.path); err != nil && !os.IsNotExist(err) {
slog.Warn("fileeventstore: failed to delete processed inbox file",
slog.String("file", item.path),
slog.String("error", err.Error()))
}
}
return nil
func (c *Collector) appendGroup(hour string, group []pendingInboxEvent) error {
logPath := filepath.Join(c.store.baseDir, logPrefix+hour+logSuffix)
reloadSeenIDs := func() {
if err := c.loadSeenIDsFromFile(logPath); err != nil {
slog.Warn("fileeventstore: failed to reload seen-set after append error",
slog.String("file", logPath),
slog.String("error", err.Error()))
}
}
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, filePermissions) //nolint:gosec // controlled path
if err != nil {
return fmt.Errorf("open event log %s: %w", logPath, err)
}
defer func() { _ = f.Close() }()
writer := bufio.NewWriter(f)
for _, item := range group {
if _, err := writer.Write(item.raw); err != nil {
return fmt.Errorf("append event log %s: %w", logPath, err)
}
if err := writer.WriteByte('\n'); err != nil {
return fmt.Errorf("append newline %s: %w", logPath, err)
}
}
if err := writer.Flush(); err != nil {
reloadSeenIDs()
return fmt.Errorf("flush event log %s: %w", logPath, err)
}
if err := f.Sync(); err != nil {
reloadSeenIDs()
return fmt.Errorf("sync event log %s: %w", logPath, err)
}
for _, item := range group {
c.seenIDs[item.event.ID] = struct{}{}
if err := os.Remove(item.path); err != nil && !os.IsNotExist(err) {
slog.Warn("fileeventstore: failed to delete processed inbox file",
slog.String("file", item.path),
slog.String("error", err.Error()))
}
}
return nil
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/fileeventstore/collector.go` around lines 179 - 211, The
appendGroup function can return on Flush/Sync failures after partially writing
some items, which causes duplicates on retry; fix by tracking how many items
were successfully written (e.g. increment a writtenCount after each successful
writer.Write and WriteByte), and on any error (including Flush/Sync) still mark
c.seenIDs for group[:writtenCount] and remove their inbox files (using the
existing item.path logic, skipping os.IsNotExist errors); ensure the
final-success path also processes all items, and reference appendGroup,
c.seenIDs, pendingInboxEvent.event.ID, pendingInboxEvent.path, writer.Flush and
f.Sync when making the change.

Comment on lines +264 to +293
func (c *Collector) loadSeenIDsFromFile(filePath string) error {
f, err := os.Open(filePath) //nolint:gosec // controlled path
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("open event log %s: %w", filePath, err)
}
defer func() { _ = f.Close() }()

scanner := bufio.NewScanner(f)
lineNum := 0
for scanner.Scan() {
lineNum++
event := new(eventstore.Event)
if err := json.Unmarshal(scanner.Bytes(), event); err != nil {
slog.Warn("fileeventstore: skipping malformed committed event while loading seen-set",
slog.String("file", filePath),
slog.Int("line", lineNum),
slog.String("error", err.Error()))
continue
}
if event.ID != "" {
c.seenIDs[event.ID] = struct{}{}
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scan event log %s: %w", filePath, err)
}
return nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's check if the file exists and read it
cat -n internal/persis/fileeventstore/collector.go | head -300 | tail -50

Repository: dagu-org/dagu

Length of output: 1565


🏁 Script executed:

# Let's also search for the Query method mentioned in the review
rg "func.*Query" internal/persis/fileeventstore/

Repository: dagu-org/dagu

Length of output: 709


🏁 Script executed:

# Check for the file and basic information
ls -la internal/persis/fileeventstore/collector.go
wc -l internal/persis/fileeventstore/collector.go

Repository: dagu-org/dagu

Length of output: 197


🏁 Script executed:

# Check the Query and readCommittedEvents methods in store.go
rg -A 30 "func \(s \*Store\) Query" internal/persis/fileeventstore/store.go

Repository: dagu-org/dagu

Length of output: 925


🏁 Script executed:

# Check readCommittedEvents method
rg -A 30 "func \(s \*Store\) readCommittedEvents" internal/persis/fileeventstore/store.go

Repository: dagu-org/dagu

Length of output: 1055


🏁 Script executed:

# Check if the file has GPL v3 license header
head -20 internal/persis/fileeventstore/collector.go

Repository: dagu-org/dagu

Length of output: 334


Lift the bufio.Scanner token limit when rebuilding seenIDs and querying events.

Both loadSeenIDsFromFile and readCommittedEvents (called by Query) use bufio.NewScanner with the default 64 KiB token limit. When scanner.Scan() fails due to a line exceeding this limit, scanner.Err() triggers and the loop breaks silently—events after that point are never processed. In loadSeenIDsFromFile, these IDs never enter c.seenIDs, breaking deduplication. In readCommittedEvents, they're dropped from query results.

Add scanner.Buffer(make([]byte, 64*1024), 10*1024*1024) after creating each scanner to allow up to 10 MiB lines.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/fileeventstore/collector.go` around lines 264 - 293, The
bufio.Scanner in loadSeenIDsFromFile (and likewise in readCommittedEvents used
by Query) uses the default 64KiB token limit which can drop long lines; after
creating each scanner (i.e., right after scanner := bufio.NewScanner(f)) call
scanner.Buffer(make([]byte, 64*1024), 10*1024*1024) to raise the maximum token
size to 10 MiB (keeping a 64 KiB initial buffer) so long event lines are read
instead of causing scanner errors and skipping events/IDs.

Comment on lines +296 to +355
func (c *Collector) cleanupExpired() {
if c.retentionDays <= 0 {
return
}

now := c.now().UTC()
cutoff := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).
AddDate(0, 0, -c.retentionDays)

baseEntries, err := os.ReadDir(c.store.baseDir)
if err == nil {
for _, entry := range baseEntries {
if entry.IsDir() {
continue
}
window, ok := parseCommittedFileWindow(filepath.Join(c.store.baseDir, entry.Name()), entry.Name())
if !ok || window.end.After(cutoff) {
continue
}
path := window.path
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
slog.Warn("fileeventstore: failed to remove expired event log",
slog.String("file", path),
slog.String("error", err.Error()))
}
}
} else if !os.IsNotExist(err) {
slog.Warn("fileeventstore: failed to read event store directory for cleanup",
slog.String("dir", c.store.baseDir),
slog.String("error", err.Error()))
}

quarantineEntries, err := os.ReadDir(c.store.quarantineDir)
if err != nil {
if !os.IsNotExist(err) {
slog.Warn("fileeventstore: failed to read quarantine directory for cleanup",
slog.String("dir", c.store.quarantineDir),
slog.String("error", err.Error()))
}
return
}
for _, entry := range quarantineEntries {
if entry.IsDir() {
continue
}
info, err := entry.Info()
if err != nil {
continue
}
modDay, ok := utcDay(info.ModTime())
if !ok || !modDay.Before(cutoff) {
continue
}
path := filepath.Join(c.store.quarantineDir, entry.Name())
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
slog.Warn("fileeventstore: failed to remove expired quarantined event file",
slog.String("file", path),
slog.String("error", err.Error()))
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Retention cleanup needs to prune the in-memory dedupe set too.

cleanupExpired removes old log files, but c.seenIDs only ever grows. A long-lived scheduler will retain every historical ID in memory and keep suppressing IDs that should have expired with the deleted files.

🛠️ Proposed fix
 func (c *Collector) cleanupExpired() {
 	if c.retentionDays <= 0 {
 		return
 	}
@@
 	cutoff := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).
 		AddDate(0, 0, -c.retentionDays)
+	removedCommitted := false
 
 	baseEntries, err := os.ReadDir(c.store.baseDir)
 	if err == nil {
@@
 			path := window.path
 			if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
 				slog.Warn("fileeventstore: failed to remove expired event log",
 					slog.String("file", path),
 					slog.String("error", err.Error()))
+			} else {
+				removedCommitted = true
 			}
 		}
 	} else if !os.IsNotExist(err) {
@@
 			slog.String("error", err.Error()))
 	}
+
+	if removedCommitted {
+		c.seenIDs = make(map[string]struct{})
+		if err := c.loadSeenIDs(); err != nil {
+			slog.Warn("fileeventstore: failed to rebuild seen-set after cleanup",
+				slog.String("dir", c.store.baseDir),
+				slog.String("error", err.Error()))
+		}
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/fileeventstore/collector.go` around lines 296 - 355,
cleanupExpired currently deletes expired files but never prunes the in-memory
dedupe map c.seenIDs, so add logic to prune seen IDs when files are removed: in
cleanupExpired, after successfully removing a committed file (when you have
variable window/path) call a new helper like c.removeSeenIDsForWindow(window)
that locks c.seenIDs and deletes IDs associated with that committed window
(e.g., IDs with timestamps <= window.end or otherwise mapped to that file);
likewise after removing a quarantined file (using info.ModTime()/modDay and
path) call a helper (e.g., c.removeSeenIDsBefore(cutoff) or
c.removeSeenIDsForQuarantine(path)) to remove seen IDs older than cutoff;
implement these helpers to safely lock and modify c.seenIDs (use existing sync
primitives on Collector) so the in-memory dedupe set does not grow forever.

Comment thread internal/persis/fileeventstore/store.go Outdated
Comment on lines +70 to +80
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("fileeventstore: marshal event: %w", err)
}
recordedAt := event.RecordedAt
if recordedAt.IsZero() {
recordedAt = time.Now().UTC()
}
name := fmt.Sprintf("%020d-%s%s", recordedAt.UnixNano(), uuid.NewString(), inboxSuffix)
path := filepath.Join(s.inboxDir, name)
if err := fileutil.WriteFileAtomic(path, data, filePermissions); err != nil {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Default RecordedAt before marshaling the inbox payload.

When RecordedAt is zero, the synthesized timestamp is only used in the filename. The JSON written to inbox still carries the zero value, so downstream ordering/audit fields can drift from what the file metadata says.

🛠️ Proposed fix
 func (s *Store) Emit(_ context.Context, event *eventstore.Event) error {
 	if event == nil {
 		return errors.New("fileeventstore: event cannot be nil")
 	}
-	data, err := json.Marshal(event)
-	if err != nil {
-		return fmt.Errorf("fileeventstore: marshal event: %w", err)
-	}
-	recordedAt := event.RecordedAt
+	payload := *event
+	recordedAt := payload.RecordedAt
 	if recordedAt.IsZero() {
 		recordedAt = time.Now().UTC()
+		payload.RecordedAt = recordedAt
+	}
+	data, err := json.Marshal(&payload)
+	if err != nil {
+		return fmt.Errorf("fileeventstore: marshal event: %w", err)
 	}
 	name := fmt.Sprintf("%020d-%s%s", recordedAt.UnixNano(), uuid.NewString(), inboxSuffix)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("fileeventstore: marshal event: %w", err)
}
recordedAt := event.RecordedAt
if recordedAt.IsZero() {
recordedAt = time.Now().UTC()
}
name := fmt.Sprintf("%020d-%s%s", recordedAt.UnixNano(), uuid.NewString(), inboxSuffix)
path := filepath.Join(s.inboxDir, name)
if err := fileutil.WriteFileAtomic(path, data, filePermissions); err != nil {
payload := *event
recordedAt := payload.RecordedAt
if recordedAt.IsZero() {
recordedAt = time.Now().UTC()
payload.RecordedAt = recordedAt
}
data, err := json.Marshal(&payload)
if err != nil {
return fmt.Errorf("fileeventstore: marshal event: %w", err)
}
name := fmt.Sprintf("%020d-%s%s", recordedAt.UnixNano(), uuid.NewString(), inboxSuffix)
path := filepath.Join(s.inboxDir, name)
if err := fileutil.WriteFileAtomic(path, data, filePermissions); err != nil {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/fileeventstore/store.go` around lines 70 - 80, The event's
RecordedAt is being synthesized into recordedAt for the filename but not written
into the marshaled JSON; before calling json.Marshal(event) update
event.RecordedAt when it is zero (set event.RecordedAt = time.Now().UTC()) so
the payload matches the filename/timestamp; make this change in the block around
json.Marshal(event) / recordedAt / name (references: event.RecordedAt,
recordedAt, json.Marshal(event), name, fileutil.WriteFileAtomic, inboxSuffix,
s.inboxDir).

Comment on lines +22 to +30
{name: "NotStarted", status: core.NotStarted, ok: false},
{name: "Queued", status: core.Queued, ok: false},
{name: "Running", status: core.Running, ok: false},
{name: "Rejected", status: core.Rejected, ok: false},
{name: "Waiting", status: core.Waiting, want: TypeDAGRunWaiting, ok: true},
{name: "Succeeded", status: core.Succeeded, want: TypeDAGRunSucceeded, ok: true},
{name: "PartiallySucceeded", status: core.PartiallySucceeded, want: TypeDAGRunSucceeded, ok: true},
{name: "Failed", status: core.Failed, want: TypeDAGRunFailed, ok: true},
{name: "Aborted", status: core.Aborted, want: TypeDAGRunAborted, ok: true},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't lock Rejected out of the centralized event feed.

The rejection path in internal/service/frontend/api/v1/dagruns.go marks the DAG run as core.Rejected and sets FinishedAt, so this is a terminal outcome. Asserting ok=false here means /event-logs will never surface user rejections. Please emit a dedicated rejected event, or explicitly fold Rejected into another terminal type, before codifying this behavior in the test.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/eventstore/eventstore_test.go` around lines 22 - 30, The
test currently excludes core.Rejected from the centralized event feed (ok:
false), which hides user rejections shown by
internal/service/frontend/api/v1/dagruns.go (it marks runs Rejected and sets
FinishedAt); update the test table in eventstore_test.go so core.Rejected is
treated as a terminal event (ok: true) and set want to the appropriate terminal
event type used by the codebase (prefer TypeDAGRunRejected if you add that
constant, or fold it into an existing terminal type such as
TypeDAGRunFailed/TypeDAGRunAborted consistent with the production mapping),
ensuring the test's mapping mirrors how Rejected is emitted or folded in the
event-emission code.

Comment on lines +75 to +96
func (e *Event) Normalize() {
if e == nil {
return
}
if e.SchemaVersion == 0 {
e.SchemaVersion = SchemaVersion
}
if e.SourceService == "" {
e.SourceService = SourceServiceUnknown
}
if e.SourceInstance == "" {
e.SourceInstance = DefaultSourceInstance()
}
if e.RecordedAt.IsZero() {
e.RecordedAt = time.Now().UTC()
} else {
e.RecordedAt = e.RecordedAt.UTC()
}
if !e.OccurredAt.IsZero() {
e.OccurredAt = e.OccurredAt.UTC()
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep Normalize() read-safe.

Normalize() is doing write-time repair (schema_version, source_*, recorded_at) rather than just canonicalization. Because the file-backed query path also runs it, a partially populated record can pick up the reader's host:pid and current clock on every query instead of failing validation. Please split UTC normalization from emit-time defaulting.

Comment on lines +315 to +317
func stableID(parts ...string) string {
sum := sha256.Sum256([]byte(strings.Join(parts, "\x1f")))
return hex.EncodeToString(sum[:])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

stableID is ambiguous for arbitrary string parts.

strings.Join(parts, "\x1f") is not a collision-safe framing: ["a", "b\x1fc"] and ["a\x1fb", "c"] produce the same byte stream, so two distinct events can hash to the same ID. Length-prefix each part, or hash incrementally, before combining them.

🔐 Collision-safe framing for the hash input
 import (
 	"context"
 	"crypto/sha256"
+	"encoding/binary"
 	"encoding/hex"
 	"errors"
 	"fmt"
 	"maps"
 	"os"
-	"strings"
 	"time"
 )
@@
 func stableID(parts ...string) string {
-	sum := sha256.Sum256([]byte(strings.Join(parts, "\x1f")))
-	return hex.EncodeToString(sum[:])
+	h := sha256.New()
+	var lenBuf [8]byte
+	for _, part := range parts {
+		binary.BigEndian.PutUint64(lenBuf[:], uint64(len(part)))
+		_, _ = h.Write(lenBuf[:])
+		_, _ = h.Write([]byte(part))
+	}
+	return hex.EncodeToString(h.Sum(nil))
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func stableID(parts ...string) string {
sum := sha256.Sum256([]byte(strings.Join(parts, "\x1f")))
return hex.EncodeToString(sum[:])
func stableID(parts ...string) string {
h := sha256.New()
var lenBuf [8]byte
for _, part := range parts {
binary.BigEndian.PutUint64(lenBuf[:], uint64(len(part)))
_, _ = h.Write(lenBuf[:])
_, _ = h.Write([]byte(part))
}
return hex.EncodeToString(h.Sum(nil))
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/eventstore/eventstore.go` around lines 315 - 317, The
stableID function currently concatenates parts with "\x1f", which is not
collision-safe; update stableID to perform collision-safe framing by hashing
incrementally (use sha256.New, Write each part prefixed by its length or write a
fixed-size length header using encoding/binary) instead of strings.Join, so that
sequences like ["a","b\x1fc"] and ["a\x1fb","c"] produce different digests;
ensure you still return hex.EncodeToString of the final sum to preserve the API.

Comment on lines +734 to +736
func (a *API) updateDAGRunStatus(ctx context.Context, ref exec.DAGRunRef, status exec.DAGRunStatus) error {
return a.dagRunMgr.UpdateStatus(a.withEventContext(ctx), ref, status)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only emit persisted DAG-run events when the top-level status changes.

updateDAGRunStatus is now the shared path for node-level mutations in internal/service/frontend/api/v1/dagruns.go. Those handlers often rewrite a run while its top-level status is still Waiting or already terminal, so wrapping every write with withEventContext will append duplicate waiting/succeeded/failed/aborted lifecycle events to /event-logs even though no DAG-run transition happened. Keep event context on true top-level transitions only, or compare the previously persisted status before emitting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/api.go` around lines 734 - 736, The current
updateDAGRunStatus always wraps writes with withEventContext which causes
duplicate lifecycle events; change updateDAGRunStatus (the shared mutator used
by handlers in dagruns.go) to fetch the current persisted DAGRun status via
dagRunMgr (or use an existing read method) and only call dagRunMgr.UpdateStatus
with a context that includes withEventContext when the persisted top-level
status actually changes to a different lifecycle state; otherwise perform the
update without withEventContext (or skip emitting) so node-level rewrites while
status is Waiting or terminal do not append duplicate
waiting/succeeded/failed/aborted events.

Comment on lines +81 to +130
result, err := a.eventService.Query(ctx, filter)
if err != nil {
return nil, &Error{
Code: api.ErrorCodeInternalError,
Message: "Failed to query event logs",
HTTPStatus: http.StatusInternalServerError,
}
}

entries := make([]api.EventLogEntry, 0, len(result.Entries))
for _, e := range result.Entries {
entry := api.EventLogEntry{
Id: e.ID,
SchemaVersion: e.SchemaVersion,
OccurredAt: e.OccurredAt,
RecordedAt: e.RecordedAt,
Kind: string(e.Kind),
Type: string(e.Type),
SourceService: e.SourceService,
}
if e.SourceInstance != "" {
entry.SourceInstance = &e.SourceInstance
}
if e.DAGName != "" {
entry.DagName = &e.DAGName
}
if e.DAGRunID != "" {
entry.DagRunId = &e.DAGRunID
}
if e.AttemptID != "" {
entry.AttemptId = &e.AttemptID
}
if e.SessionID != "" {
entry.SessionId = &e.SessionID
}
if e.UserID != "" {
entry.UserId = &e.UserID
}
if e.Model != "" {
entry.Model = &e.Model
}
if e.Status != "" {
entry.Status = &e.Status
}
if len(e.Data) > 0 {
data := make(map[string]any, len(e.Data))
maps.Copy(data, e.Data)
entry.Data = &data
}
entries = append(entries, entry)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle nil query results defensively.

Store.Query returns *QueryResult with Entries []*eventstore.Event; if a store ever returns nil, nil or includes a nil entry, this handler panics on len(result.Entries) or e.ID instead of returning a response.

🛡️ Minimal fix
 	result, err := a.eventService.Query(ctx, filter)
 	if err != nil {
 		return nil, &Error{
 			Code:       api.ErrorCodeInternalError,
 			Message:    "Failed to query event logs",
 			HTTPStatus: http.StatusInternalServerError,
 		}
 	}
+	if result == nil {
+		result = &eventstore.QueryResult{}
+	}
 
 	entries := make([]api.EventLogEntry, 0, len(result.Entries))
 	for _, e := range result.Entries {
+		if e == nil {
+			continue
+		}
 		entry := api.EventLogEntry{
 			Id:            e.ID,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
result, err := a.eventService.Query(ctx, filter)
if err != nil {
return nil, &Error{
Code: api.ErrorCodeInternalError,
Message: "Failed to query event logs",
HTTPStatus: http.StatusInternalServerError,
}
}
entries := make([]api.EventLogEntry, 0, len(result.Entries))
for _, e := range result.Entries {
entry := api.EventLogEntry{
Id: e.ID,
SchemaVersion: e.SchemaVersion,
OccurredAt: e.OccurredAt,
RecordedAt: e.RecordedAt,
Kind: string(e.Kind),
Type: string(e.Type),
SourceService: e.SourceService,
}
if e.SourceInstance != "" {
entry.SourceInstance = &e.SourceInstance
}
if e.DAGName != "" {
entry.DagName = &e.DAGName
}
if e.DAGRunID != "" {
entry.DagRunId = &e.DAGRunID
}
if e.AttemptID != "" {
entry.AttemptId = &e.AttemptID
}
if e.SessionID != "" {
entry.SessionId = &e.SessionID
}
if e.UserID != "" {
entry.UserId = &e.UserID
}
if e.Model != "" {
entry.Model = &e.Model
}
if e.Status != "" {
entry.Status = &e.Status
}
if len(e.Data) > 0 {
data := make(map[string]any, len(e.Data))
maps.Copy(data, e.Data)
entry.Data = &data
}
entries = append(entries, entry)
result, err := a.eventService.Query(ctx, filter)
if err != nil {
return nil, &Error{
Code: api.ErrorCodeInternalError,
Message: "Failed to query event logs",
HTTPStatus: http.StatusInternalServerError,
}
}
if result == nil {
result = &eventstore.QueryResult{}
}
entries := make([]api.EventLogEntry, 0, len(result.Entries))
for _, e := range result.Entries {
if e == nil {
continue
}
entry := api.EventLogEntry{
Id: e.ID,
SchemaVersion: e.SchemaVersion,
OccurredAt: e.OccurredAt,
RecordedAt: e.RecordedAt,
Kind: string(e.Kind),
Type: string(e.Type),
SourceService: e.SourceService,
}
if e.SourceInstance != "" {
entry.SourceInstance = &e.SourceInstance
}
if e.DAGName != "" {
entry.DagName = &e.DAGName
}
if e.DAGRunID != "" {
entry.DagRunId = &e.DAGRunID
}
if e.AttemptID != "" {
entry.AttemptId = &e.AttemptID
}
if e.SessionID != "" {
entry.SessionId = &e.SessionID
}
if e.UserID != "" {
entry.UserId = &e.UserID
}
if e.Model != "" {
entry.Model = &e.Model
}
if e.Status != "" {
entry.Status = &e.Status
}
if len(e.Data) > 0 {
data := make(map[string]any, len(e.Data))
maps.Copy(data, e.Data)
entry.Data = &data
}
entries = append(entries, entry)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/events.go` around lines 81 - 130, The
handler should defensively handle a nil QueryResult and nil entries from
eventService.Query: after calling a.eventService.Query(ctx, filter) check if
result == nil and treat it like an empty result (returning zero entries) or
return a well-formed Error; when iterating result.Entries skip any nil element
(if e == nil continue) before accessing fields like e.ID, e.SchemaVersion, etc.;
also guard the make([...], len(result.Entries)) allocation by using 0 when
result is nil. Update the code paths around the symbols result, result.Entries,
and the loop over for _, e := range result.Entries in the function that calls
a.eventService.Query to avoid panics.

@yottahmd yottahmd merged commit e052405 into main Apr 1, 2026
5 checks passed
@yottahmd yottahmd deleted the eventfeed branch April 1, 2026 01:08
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 1, 2026

Codecov Report

❌ Patch coverage is 55.23179% with 338 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.91%. Comparing base (2948ad9) to head (8a1a7cf).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
internal/persis/fileeventstore/collector.go 48.96% 88 Missing and 35 partials ⚠️
internal/persis/fileeventstore/store.go 56.81% 43 Missing and 33 partials ⚠️
internal/service/eventstore/eventstore.go 53.79% 56 Missing and 17 partials ⚠️
internal/agent/api.go 8.33% 20 Missing and 2 partials ⚠️
internal/cmd/context.go 67.34% 10 Missing and 6 partials ⚠️
internal/service/eventstore/context.go 74.07% 5 Missing and 2 partials ⚠️
internal/cmn/config/config.go 33.33% 2 Missing and 2 partials ⚠️
internal/cmn/config/loader.go 81.25% 2 Missing and 1 partial ⚠️
internal/core/exec/enqueue_retry.go 0.00% 2 Missing and 1 partial ⚠️
internal/cmd/coord.go 86.66% 0 Missing and 2 partials ⚠️
... and 5 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1885      +/-   ##
==========================================
- Coverage   69.11%   68.91%   -0.21%     
==========================================
  Files         447      452       +5     
  Lines       55361    56092     +731     
==========================================
+ Hits        38263    38653     +390     
- Misses      13695    13933     +238     
- Partials     3403     3506     +103     
Files with missing lines Coverage Δ
internal/cmn/config/key_hints.go 45.45% <ø> (ø)
internal/cmn/config/path.go 90.24% <100.00%> (+0.50%) ⬆️
internal/persis/fileaudit/store.go 72.64% <100.00%> (+0.26%) ⬆️
internal/service/coordinator/handler.go 65.87% <100.00%> (-0.26%) ⬇️
internal/service/coordinator/service.go 86.13% <100.00%> (+0.57%) ⬆️
internal/cmd/scheduler.go 78.78% <75.00%> (+0.66%) ⬆️
internal/cmd/coord.go 77.69% <86.66%> (+0.49%) ⬆️
internal/cmd/startall.go 51.04% <0.00%> (ø)
internal/cmn/fileutil/scanner.go 50.00% <50.00%> (ø)
internal/persis/filedagrun/attempt.go 72.33% <0.00%> (-0.49%) ⬇️
... and 10 more

... and 6 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update eaee029...8a1a7cf. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant