Skip to content

fix(filequeue): process queue specified job properly#1457

Merged
yottahmd merged 9 commits intomainfrom
1437-fix-queue-issue
Dec 7, 2025
Merged

fix(filequeue): process queue specified job properly#1457
yottahmd merged 9 commits intomainfrom
1437-fix-queue-issue

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Dec 6, 2025

resolves #1437

Co-authored by @kriyanshii

Summary by CodeRabbit

  • Bug Fixes

    • More robust error handling when reading queued item payloads; items with missing/corrupt data are skipped and produce clearer error messages.
    • Queue processing validates item data before acting, reducing nil/error cases and flaky dequeue behavior.
  • Refactor

    • Queue persistence now lazily loads and caches payloads from storage and exposes stable item IDs for listings/dequeues.
    • Queue naming now derives from the DAG processing group for more consistent grouping across views and APIs.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Dec 6, 2025

Walkthrough

Queued item data moved to lazy, file-backed loading: Data() now returns (*DAGRunRef, error). File-queue types introduce QueuedFile that reads/unmarshals JSON on demand; callers now fetch, check, and dereference item data before further processing.

Changes

Cohort / File(s) Summary
Core interface
internal/core/execution/queue.go
Data() signature changed from Data() DAGRunRef to Data() (*DAGRunRef, error).
File-backed queue — types & loader
internal/persistence/filequeue/job.go, internal/persistence/filequeue/queuefile.go, internal/persistence/filequeue/store.go
Added QueuedFile (file path, cache, mutex), lazy loadData()/Data() returning (*DAGRunRef, error), ExtractJob() to materialize Job; queue ops updated to use file-backed items and new return types.
File-backed queue — list/pop behavior
internal/persistence/filequeue/queue.go
DualQueue.DequeueByDAGRunID now appends slices in one call (merge change).
File-backed queue — tests
internal/persistence/filequeue/*.go tests: job_test.go, queue_test.go, queuefile_test.go, store_test.go
Tests adapted to NewQueuedFile(file), Data() (*..., error), ExtractJob(), lazy-loading/caching behavior, and to assert via ID() where files are removed.
Queue callers — CLI / scheduler / frontend
internal/cmd/dequeue.go, internal/service/scheduler/queue_processor.go, internal/service/scheduler/queue_processor_test.go, internal/service/frontend/api/v2/queues.go
Callers now call data, err := item.Data(), handle errors, dereference *DAGRunRef for downstream calls; frontend derives queue names from DAG attempt ProcGroup and loads DAG from attempt before status checks.
Integration tests
internal/integration/distributed_e2e_test.go, internal/integration/distributed_test.go
Tests updated to receive (data, err) from Data(), assert require.NoError(err) before using data, and dereference pointer fields (e.g., data.ID).
Small misc tests/changes
internal/persistence/filequeue/store.go, other small test adjustments
ListByDAGName and All iterate QueuedFile items, log and skip items on load errors; tests altered to avoid calling Data() after dequeue where files may be removed.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    actor CLI
    participant FileQueue as FileQueue (QueuedFile)
    participant Scheduler
    participant DagAttemptStore as AttemptStore
    participant DAG as DAGLoader

    CLI->>FileQueue: Push (write JSON file)
    Note right of FileQueue: persisted as file + filename metadata

    Scheduler->>FileQueue: List/Pop -> returns QueuedFile (file path)
    Scheduler->>FileQueue: item.Data()
    FileQueue-->>Scheduler: (*DAGRunRef, error)
    alt data loaded
        Scheduler->>AttemptStore: FindAttempt(*dagRunRef)
        AttemptStore-->>Scheduler: Attempt
        Scheduler->>DAG: Attempt.ReadDAG(ctx)
        DAG-->>Scheduler: DAG (or error)
        alt DAG loaded
            Scheduler->>Scheduler: processDAG(runRef := *dagRunRef)
        else DAG load error
            Scheduler-->>Scheduler: log & skip item
        end
    else load error
        Scheduler-->>Scheduler: log & skip item
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Review callers of Data() across scheduler, frontend, CLI, and integration tests for correct error handling and nil dereference protection.
  • Inspect QueuedFile.loadData() for locking, caching, JSON unmarshalling, path handling, and error-wrapping.
  • Validate conversions between QueuedFile, Job, and execution.QueuedItemData across queue APIs.
  • Confirm frontend change: queue name derivation from Attempt.ProcGroup and DAG loading/error flows.

Poem

🐰
I hop through files that whisper names,
I peek, I parse, I fetch with aims.
If JSON hides or crumbs go cold,
I skip, I log — still brave and bold.
A pointer found, the run proceeds.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.43% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main objective: fixing queue job processing by improving how queued DAGs are handled and loaded.
Linked Issues check ✅ Passed The PR addresses issue #1437 by fixing the queue job processing to properly extract and use DAG run references with explicit error handling throughout the call chain.
Out of Scope Changes check ✅ Passed All changes are scoped to fixing queue item data retrieval and error handling; no unrelated refactoring or feature additions are present.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch 1437-fix-queue-issue

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
internal/persistence/filequeue/job.go (1)

14-18: Consider documenting the dual-source design.

The Job struct embeds ItemData (used for ID extraction in constructor) but Data() reads from the file path. This is intentional - the file is the source of truth for data retrieval. Consider adding a brief comment to clarify this relationship for future maintainers.

 type Job struct {
-	id   string
-	file string
+	id   string // derived from ItemData.FileName during construction
+	file string // source of truth for Data() calls
 	ItemData
 }
internal/persistence/filequeue/store.go (1)

191-199: Consider returning partial results with warning or collecting errors.

Currently, items with data retrieval errors are silently skipped (only logged). While this prevents one corrupted queue file from breaking the entire list operation, callers have no way to know that items were skipped.

Consider one of these alternatives:

  1. Return a separate slice of errors alongside results
  2. Add a warning-level log with the skipped item count
  3. Document this skip behavior in the method's docstring
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 07929d9 and c95453b.

📒 Files selected for processing (13)
  • internal/cmd/dequeue.go (1 hunks)
  • internal/core/execution/queue.go (1 hunks)
  • internal/integration/distributed_e2e_test.go (1 hunks)
  • internal/integration/distributed_test.go (3 hunks)
  • internal/persistence/filequeue/job.go (3 hunks)
  • internal/persistence/filequeue/job_test.go (2 hunks)
  • internal/persistence/filequeue/queue_test.go (4 hunks)
  • internal/persistence/filequeue/queuefile.go (8 hunks)
  • internal/persistence/filequeue/store.go (2 hunks)
  • internal/persistence/filequeue/store_test.go (3 hunks)
  • internal/service/frontend/api/v2/queues.go (2 hunks)
  • internal/service/scheduler/queue_processor.go (2 hunks)
  • internal/service/scheduler/queue_processor_test.go (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/core/execution/queue.go
  • internal/integration/distributed_test.go
  • internal/persistence/filequeue/store.go
  • internal/persistence/filequeue/job_test.go
  • internal/persistence/filequeue/queuefile.go
  • internal/integration/distributed_e2e_test.go
  • internal/persistence/filequeue/job.go
  • internal/cmd/dequeue.go
  • internal/service/frontend/api/v2/queues.go
  • internal/persistence/filequeue/queue_test.go
  • internal/persistence/filequeue/store_test.go
  • internal/service/scheduler/queue_processor.go
  • internal/service/scheduler/queue_processor_test.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/integration/distributed_test.go
  • internal/persistence/filequeue/job_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/persistence/filequeue/queue_test.go
  • internal/persistence/filequeue/store_test.go
  • internal/service/scheduler/queue_processor_test.go
🧬 Code graph analysis (12)
internal/core/execution/queue.go (1)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (144-147)
internal/integration/distributed_test.go (1)
internal/runtime/data.go (1)
  • Data (17-20)
internal/persistence/filequeue/store.go (1)
internal/persistence/filequeue/job.go (1)
  • NewJob (20-29)
internal/persistence/filequeue/job_test.go (3)
internal/persistence/filequeue/queuefile.go (1)
  • ItemData (62-66)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (144-147)
internal/persistence/filequeue/job.go (1)
  • NewJob (20-29)
internal/persistence/filequeue/queuefile.go (3)
internal/common/logger/tag/tag.go (3)
  • File (72-74)
  • Error (20-22)
  • Name (271-273)
internal/persistence/filequeue/job.go (1)
  • NewJob (20-29)
internal/common/logger/context.go (2)
  • Error (50-52)
  • Warn (45-47)
internal/integration/distributed_e2e_test.go (1)
internal/runtime/data.go (1)
  • Data (17-20)
internal/persistence/filequeue/job.go (2)
internal/persistence/filequeue/queuefile.go (1)
  • ItemData (62-66)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (144-147)
internal/cmd/dequeue.go (1)
internal/runtime/data.go (1)
  • Data (17-20)
internal/persistence/filequeue/queue_test.go (1)
internal/runtime/data.go (1)
  • Data (17-20)
internal/persistence/filequeue/store_test.go (1)
internal/runtime/data.go (1)
  • Data (17-20)
internal/service/scheduler/queue_processor.go (4)
internal/runtime/data.go (1)
  • Data (17-20)
internal/common/logger/tag/tag.go (3)
  • Error (20-22)
  • RunID (40-42)
  • ID (276-278)
internal/common/logger/context.go (1)
  • Error (50-52)
internal/common/logger/logger.go (1)
  • WithValues (83-89)
internal/service/scheduler/queue_processor_test.go (1)
internal/runtime/data.go (1)
  • Data (17-20)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (19)
internal/core/execution/queue.go (1)

70-70: Interface change enables proper error handling for queue data retrieval.

The signature change from Data() DAGRunRef to Data() (*DAGRunRef, error) is the core fix for issue #1437. This allows the implementation to lazily load queue item data from the file system and properly propagate errors (like "name or path is required") to callers.

internal/persistence/filequeue/job.go (1)

37-49: Data() now correctly loads DAGRunRef from file - this is the fix for #1437.

The implementation properly reads the queue file, unmarshals the JSON, and returns the DAGRunRef. This fixes the original error "failed to load DAG from : name or path is required" by ensuring the DAG name and ID are actually loaded from the persisted queue file.

One consideration: Data() reads the file on every call. If this method is called frequently for the same item, consider caching the result. However, for the current usage pattern (called once during dequeue/processing), this is acceptable.

internal/service/scheduler/queue_processor_test.go (1)

100-109: Test correctly updated for new Data() API.

The test properly handles the new error return from Data() and uses require.NoError to fail fast if data retrieval fails. The comparison logic using data.ID is correct.

internal/cmd/dequeue.go (1)

62-66: Proper error handling and pointer dereferencing for dequeue operation.

The code correctly:

  1. Retrieves data with error handling
  2. Provides a descriptive error message if data retrieval fails
  3. Dereferences *data safely (after error check) to pass value to dequeueDAGRun
internal/integration/distributed_e2e_test.go (1)

74-78: E2E test correctly adapted for new Data() API.

The test now properly handles the error return from Data() and continues to log the DAG name and run ID. The require.NoError assertion ensures test failure if data cannot be retrieved.

internal/integration/distributed_test.go (3)

68-71: Test correctly handles new Data() API for queue verification.


160-165: Properly extracts DAGRunRef value for subsequent dequeue operation.

The code correctly dereferences *data to assign to the dagRun variable of type execution.DAGRunRef, which is then used for DequeueByDAGRunID on line 168.


188-192: Retry verification correctly uses new Data() API.

The test properly asserts that the retry was enqueued with the same DAG run ID.

internal/persistence/filequeue/store.go (1)

109-109: NewJob call correctly updated to include file path.

The file path is now passed as the first argument to NewJob, enabling the lazy-loading behavior in Data().

internal/persistence/filequeue/job_test.go (2)

13-44: Comprehensive test setup with file-based data.

The test correctly:

  1. Creates a temp directory and file with valid JSON content
  2. Passes the file path to NewJob
  3. Validates both ID extraction and Data() retrieval

This properly exercises the new file-backed implementation.


46-61: Good error case coverage for missing file.

TestJob_DataError validates that Data() returns an error when the backing file doesn't exist. This is important for handling edge cases like deleted queue files. As per coding guidelines, this covers the failure path appropriately.

internal/service/scheduler/queue_processor.go (2)

290-298: LGTM! This addresses the root cause of the reported issue.

The explicit error handling for item.Data() resolves the "name or path is required" error mentioned in issue #1437. The pattern correctly retrieves data once, handles errors, and reuses the data reference.

Note: Similar to line 276, verify that Data() never returns (nil, nil) when error is nil to prevent potential panic at line 298.


271-276: The code is safe as written. The Data() method on execution.QueuedItemData follows the standard Go error handling contract: when it returns a non-nil error, the data pointer is nil; when it returns no error, the pointer is guaranteed to be non-nil. The implementation in internal/persistence/filequeue/job.go confirms this by returning &itemData.DAGRun (always a valid pointer) on success or (nil, error) on failure. Dereferencing *data after the if err != nil check is safe and requires no additional nil validation.

Likely an incorrect or invalid review comment.

internal/persistence/filequeue/queue_test.go (1)

47-52: LGTM! Test updates correctly align with the new Data() API.

The tests are properly updated to handle the new error-returning signature of Data() and appropriately use ID() for verification after dequeue operations. The comments clearly explain why Data() cannot be called after dequeue (file is removed), which is helpful for maintainability.

Also applies to: 63-63, 92-95, 101-104, 126-127, 131-131, 146-149

internal/persistence/filequeue/store_test.go (1)

43-48: LGTM! Store test updates are consistent with the new API.

The changes mirror the pattern from queue_test.go and correctly handle the error-returning Data() method. Using ID() for verification after dequeue and adding error checks for Data() calls are both appropriate.

Also applies to: 79-87, 146-149

internal/service/frontend/api/v2/queues.go (2)

91-97: LGTM! Error handling for queued items is appropriate.

Skipping items that fail data retrieval is reasonable for a listing operation. The error handling prevents the reported "name or path is required" issue from affecting the queue list endpoint.

Note: Ensure Data() never returns (nil, nil) to prevent potential panic at line 97 when dereferencing *dagRunRef.


107-107: No changes needed. The usage of dag.ProcGroup() at line 107 is correct and consistent with how queues are identified throughout the codebase. It respects the DAG's configured queue (if set via dag.Queue) rather than using the DAG name, which aligns with queue operations elsewhere.

Likely an incorrect or invalid review comment.

internal/persistence/filequeue/queuefile.go (2)

172-190: LGTM! Error handling for Data() retrieval is correct.

The code properly handles errors when retrieving job data and safely continues processing other items on failure. The pattern of calling Data() to validate file content before removal is safer than relying solely on filename-parsed metadata.

Note: While item.DAGRun.ID and item.DAGRun.Name are available from the Item struct, calling Data() validates the actual file content, which provides better correctness guarantees.


316-316: LGTM! The Item wrapper pattern is well-integrated.

The change to return []Item from listItems (lines 316, 326, 341-344) provides callers with both the file path and parsed metadata, enabling the file-backed data retrieval pattern used throughout the PR. This is a clean internal refactoring that supports the error-handling improvements.

Also applies to: 326-344

Comment thread internal/persistence/filequeue/queuefile.go Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
internal/persistence/filequeue/queuefile.go (1)

55-59: Unexport the Item struct.

The Item struct is only used internally by listItems() and is not part of the public API. Rename to item to reduce the exported API surface.

-// Item represents a queued item in the queue file
-type Item struct {
+// item represents a queued item in the queue file
+type item struct {
 	File string
 	ItemData
 }

Also update listItems return type and usages:

-func (q *QueueFile) listItems(ctx context.Context) ([]Item, error) {
+func (q *QueueFile) listItems(ctx context.Context) ([]item, error) {
 	...
-	var items []Item
+	var items []item
 	...
-		items = append(items, Item{
+		items = append(items, item{
🧹 Nitpick comments (1)
internal/persistence/filequeue/job.go (1)

98-100: Simplify selector by removing embedded field name.

The static analysis tool correctly identifies that j.ItemData.DAGRun can be simplified to j.DAGRun since ItemData is embedded in Job.

 func (j *Job) Data() (*execution.DAGRunRef, error) {
-	return &j.ItemData.DAGRun, nil
+	return &j.DAGRun, nil
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c95453b and 670d667.

📒 Files selected for processing (5)
  • internal/persistence/filequeue/job.go (1 hunks)
  • internal/persistence/filequeue/job_test.go (1 hunks)
  • internal/persistence/filequeue/queuefile.go (10 hunks)
  • internal/persistence/filequeue/queuefile_test.go (3 hunks)
  • internal/persistence/filequeue/store.go (2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/persistence/filequeue/job_test.go
  • internal/persistence/filequeue/queuefile.go
  • internal/persistence/filequeue/queuefile_test.go
  • internal/persistence/filequeue/store.go
  • internal/persistence/filequeue/job.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/persistence/filequeue/job_test.go
  • internal/persistence/filequeue/queuefile_test.go
🧬 Code graph analysis (4)
internal/persistence/filequeue/job_test.go (3)
internal/persistence/filequeue/queuefile.go (1)
  • ItemData (62-66)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (144-147)
internal/persistence/filequeue/job.go (1)
  • NewJob (23-31)
internal/persistence/filequeue/queuefile.go (3)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (144-147)
internal/core/execution/queue.go (1)
  • QueuedItemData (66-71)
internal/persistence/filequeue/job.go (2)
  • NewJob (23-31)
  • QueuedFile (15-21)
internal/persistence/filequeue/store.go (5)
internal/persistence/filequeue/job.go (1)
  • NewJob (23-31)
internal/runtime/data.go (1)
  • Data (17-20)
api/v2/api.gen.go (1)
  • Error (432-441)
internal/common/logger/tag/tag.go (2)
  • Error (20-22)
  • Name (271-273)
internal/common/logger/context.go (1)
  • Error (50-52)
internal/persistence/filequeue/job.go (3)
internal/core/execution/queue.go (1)
  • QueuedItemData (66-71)
internal/persistence/filequeue/queuefile.go (1)
  • ItemData (62-66)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (144-147)
🪛 GitHub Check: Go Linter
internal/persistence/filequeue/job.go

[failure] 99-99:
QF1008: could remove embedded field "ItemData" from selector (staticcheck)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (16)
internal/persistence/filequeue/job_test.go (4)

13-44: LGTM!

The test correctly validates the lazy-loading behavior of QueuedFile: creates a temp file with proper JSON, verifies ID extraction from filename, and confirms Data() returns the correct DAGRunRef fields.


46-55: LGTM!

Good coverage of the error path when the underlying file doesn't exist.


57-91: LGTM!

This test validates a critical aspect of the fix: ExtractJob() caches the data so that after the file is deleted (simulating what happens after Pop()), the job data remains accessible. This directly addresses the original issue.


93-130: LGTM!

The caching test properly validates that once Data() is called, subsequent calls use the cached value even if the file is deleted.

internal/persistence/filequeue/job.go (4)

13-21: LGTM!

The QueuedFile struct correctly implements lazy loading with proper synchronization via mutex. The interface satisfaction assertion ensures compile-time checking.


23-31: LGTM!

The constructor correctly derives the ID from the filename by stripping the extension, which aligns with how queue files are named.


47-69: LGTM!

The loadData() implementation correctly:

  1. Uses mutex to protect cache access
  2. Returns cached data on subsequent calls
  3. Wraps errors with context for debugging

71-83: LGTM!

ExtractJob() correctly creates a Job with a copy of the data, ensuring the job retains its data even after the source file is deleted.

internal/persistence/filequeue/store.go (2)

101-103: LGTM!

Correctly updated to use NewJob(file) which now handles lazy loading of queue item data.


185-195: LGTM!

Good error handling for the new Data() signature. Logging and skipping items that fail to load is appropriate for a listing operation.

internal/persistence/filequeue/queuefile_test.go (3)

49-54: LGTM!

Correctly updated to handle the new Data() signature that returns (*DAGRunRef, error) and properly asserts on the returned fields.


90-96: LGTM!

Test correctly validates Data() retrieval after FindByDAGRunID and verifies the ID prefix format derived from the filename.


126-130: LGTM!

Test correctly validates Data() retrieval after PopByDAGRunID operation.

internal/persistence/filequeue/queuefile.go (3)

170-199: LGTM!

The PopByDAGRunID implementation correctly:

  1. Creates a QueuedFile via NewJob(item.File)
  2. Loads data to check for matching DAGRunRef
  3. Calls ExtractJob() to cache data before file removal
  4. Removes file only after successful extraction
  5. Logs errors and continues processing remaining items

255-280: LGTM!

The Pop implementation correctly extracts job data before removing the file, ensuring the returned item has cached data accessible even after the source file is deleted. This is the core fix for issue #1437.


334-362: LGTM!

The listItems function correctly wraps each ItemData with its file path in the new Item struct, enabling callers to create lazy-loading QueuedFile instances.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
internal/persistence/filequeue/job.go (2)

47-69: Lazy loading and caching logic is sound

The loadData implementation (mutex, JSON read, unmarshal, cache) looks correct and concurrency-safe, and the shared cache avoids repeated disk reads. No correctness issues spotted here; any further optimization (e.g., sync.RWMutex for read-heavy access) would be optional and driven by profiling.


23-36: Clarify naming and comments around NewJob, QueuedFile, and Job

A few small consistency nits that may help future readers:

  • NewJob returns a *QueuedFile, while there is also a Job type; this can be slightly confusing at call sites. Consider renaming to something like NewQueuedFile or adjusting the doc comment to clarify that it constructs the file-backed queued item wrapper.
  • The comment // ID implements models.QueuedJob. is out of date now that you’re using execution.QueuedItemData.
  • Both QueuedFile and Job assert execution.QueuedItemData. If only one of them is meant to be the primary public implementation, you might drop the other assertion or add a brief comment explaining why both are exposed.

These are purely polish; the code is correct as written.

Also applies to: 71-83, 85-92

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 670d667 and 7a1cdc7.

📒 Files selected for processing (2)
  • internal/persistence/filequeue/job.go (1 hunks)
  • internal/persistence/filequeue/queue.go (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/persistence/filequeue/queue.go
  • internal/persistence/filequeue/job.go
🧬 Code graph analysis (1)
internal/persistence/filequeue/job.go (4)
internal/core/execution/queue.go (1)
  • QueuedItemData (66-71)
internal/persistence/filequeue/queuefile.go (1)
  • ItemData (62-66)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (144-147)
internal/persistence/filedagrun/dagrun.go (1)
  • DAGRun (48-52)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (2)
internal/persistence/filequeue/queue.go (1)

88-100: Bulk append change in DequeueByDAGRunID looks good

Switching from per-item appends to items = append(items, popped...) is functionally equivalent, simpler, and slightly more efficient. No correctness or concurrency concerns here.

internal/persistence/filequeue/job.go (1)

39-45: All call sites of Data() already handle errors correctly

The behavior change to return (*execution.DAGRunRef, error) has been properly implemented across the codebase. A scan of all 18 usages confirms that every call site checks the error:

  • Queue file operations in store.go, queuefile.go: error logged
  • API endpoint in queues.go: error skipped gracefully with continue
  • Scheduler in queue_processor.go: error logged
  • CLI in dequeue.go: error propagated with context
  • All tests: errors asserted with require.NoError()

No call sites proceed with zero/empty DAGRunRef on error—all are safe.

@yottahmd yottahmd changed the title fix: process queue specified job properly fix(filequeue): process queue specified job properly Dec 6, 2025
@yottahmd
Copy link
Copy Markdown
Collaborator Author

yottahmd commented Dec 6, 2025

@coderabbitai generate docstrings

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Dec 6, 2025

✅ Actions performed

Initiated docstring generation; will generate only if new commits exist.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Dec 6, 2025

Note

Docstrings generation - SUCCESS
Generated docstrings for this pull request at #1458

coderabbitai Bot added a commit that referenced this pull request Dec 6, 2025
Docstrings generation was requested by @yottahmd.

* #1457 (comment)

The following files were modified:

* `internal/cmd/dequeue.go`
* `internal/persistence/filequeue/job.go`
* `internal/persistence/filequeue/queuefile.go`
Docstrings generation was requested by @yottahmd.

* #1457 (comment)

The following files were modified:

* `internal/cmd/dequeue.go`
* `internal/persistence/filequeue/job.go`
* `internal/persistence/filequeue/queuefile.go`

<details>
<summary>These files were ignored</summary>

* `internal/integration/distributed_e2e_test.go`
* `internal/integration/distributed_test.go`
* `internal/persistence/filequeue/job_test.go`
* `internal/persistence/filequeue/queue_test.go`
* `internal/persistence/filequeue/queuefile_test.go`
* `internal/persistence/filequeue/store_test.go`
* `internal/service/scheduler/queue_processor_test.go`

</details>

<details>
<summary>ℹ️ Note</summary><blockquote>

CodeRabbit cannot perform edits on its own pull requests yet.

</blockquote></details>

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
internal/persistence/filequeue/queuefile.go (2)

46-55: NewQueueFile docs align with implementation, could add an explicit example

The constructor and compiled regex match the described pattern for queue files; to make this even clearer for future maintainers, consider adding a concrete sample filename (e.g., item_high_20250101_120000_123456789Z_<runID>.json) to the comment.


150-205: PopByDAGRunID logic correctly filters and unwraps matching jobs

Using NewQueuedFile(item.file) and checking data.Name/data.ID before ExtractJob() and file removal ensures only matching DAG runs are popped, while logging and continuing on errors avoids aborting the whole batch. Note that item.data.DAGRun already holds the parsed name/ID; if this becomes performance‑sensitive, you could filter using item.data.DAGRun first and defer JSON loading to only the matching items.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7a1cdc7 and 3888cdf.

📒 Files selected for processing (5)
  • internal/cmd/dequeue.go (3 hunks)
  • internal/persistence/filequeue/job.go (1 hunks)
  • internal/persistence/filequeue/job_test.go (1 hunks)
  • internal/persistence/filequeue/queuefile.go (13 hunks)
  • internal/persistence/filequeue/store.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/persistence/filequeue/store.go
  • internal/cmd/dequeue.go
  • internal/persistence/filequeue/job_test.go
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/persistence/filequeue/queuefile.go
  • internal/persistence/filequeue/job.go
🧬 Code graph analysis (2)
internal/persistence/filequeue/queuefile.go (3)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (144-147)
internal/core/execution/queue.go (1)
  • QueuedItemData (66-71)
internal/persistence/filequeue/job.go (2)
  • NewQueuedFile (26-34)
  • QueuedFile (15-21)
internal/persistence/filequeue/job.go (3)
internal/core/execution/queue.go (1)
  • QueuedItemData (66-71)
internal/persistence/filequeue/queuefile.go (1)
  • ItemData (63-67)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (144-147)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (6)
internal/persistence/filequeue/job.go (3)

15-34: QueuedFile wrapper and ID derivation look sound

Using the base filename without extension as the queued item ID and keeping the full path in file cleanly separates queue identity from DAG run identity and matches execution.QueuedItemData expectations. The constructor is straightforward and side‑effect free, which is good for tests and callers.


41-72: Lazy load + cache implementation is correct and thread‑safe

Data() delegating to loadData() with a sync.Mutex and a simple nil cache check ensures each file is read/unmarshalled at most once per QueuedFile, and the returned *DAGRunRef is stable because it points into the cached ItemData. Error wrapping clearly distinguishes read vs unmarshal failures.


74-101: Separation between on‑disk QueuedFile and in‑memory Job is well designed

ExtractJob() creating a Job that embeds ItemData (copying it) avoids tying consumers to filesystem concerns while still satisfying execution.QueuedItemData, and Job.Data() returning &j.DAGRun keeps the interface uniform with QueuedFile.Data().

internal/persistence/filequeue/queuefile.go (3)

57-60: queuedItem + listItems refactor improves clarity and preserves ordering

Wrapping each entry as {file, data ItemData} keeps the path and parsed metadata together, and sorting by data.QueuedAt maintains FIFO semantics based on the encoded timestamp. The shared parseQueueFileName helper is a nice centralization of filename parsing logic.

Also applies to: 335-371


233-282: Pop now returning QueuedItemData via Job is consistent and robust

Unwrapping the first item with NewQueuedFile(item.file).ExtractJob(), logging a detailed error on failure, and only removing the queue file after a successful unwrap provides a safer pop semantics, while still satisfying the execution.QueuedItemData interface for callers.


374-410: parseQueueFileName helper correctly reconstructs ItemData from filenames

The regex groups and reconstruction of QueuedAt from (timestamp, nanos) match the format produced by queueFileName, and using filepath.Base(filepath.Dir(path)) for DAGRun.Name keeps the DAG name derivation consistent with the directory layout. Renaming the local variable to data improves readability over the previous generic name.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
internal/persistence/filequeue/queuefile.go (1)

150-206: Consider more defensive handling of corrupted queue entries in Pop / PopByDAGRunID

Right now:

  • Pop returns an error and leaves the first file in place if ExtractJob() fails, so a single corrupted JSON file can permanently block popping from that queue until manual cleanup.
  • PopByDAGRunID logs and continues on Data()/ExtractJob() errors, which is more forgiving.

For better robustness, consider normalizing behavior—for example, logging and either deleting or quarantining clearly corrupted entries, or skipping to the next valid item—so a bad file cannot indefinitely block dequeues.

Also applies to: 208-231, 233-283, 285-309

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3888cdf and c1958f3.

📒 Files selected for processing (3)
  • internal/cmd/dequeue.go (2 hunks)
  • internal/persistence/filequeue/job.go (1 hunks)
  • internal/persistence/filequeue/queuefile.go (12 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/cmd/dequeue.go
  • internal/persistence/filequeue/job.go
  • internal/persistence/filequeue/queuefile.go
🧬 Code graph analysis (1)
internal/cmd/dequeue.go (1)
internal/runtime/data.go (1)
  • Data (17-20)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (3)
internal/persistence/filequeue/job.go (1)

13-85: QueuedFile / Job implementations of QueuedItemData look solid

Lazy, file‑backed loading with a per‑instance mutex and caching is correct, and Job cleanly wraps ItemData while still satisfying execution.QueuedItemData. Error wrapping provides useful file context, and the Data() (*DAGRunRef, error) shape aligns with the updated queue API without introducing concurrency or correctness issues.

Also applies to: 87-101

internal/persistence/filequeue/queuefile.go (1)

46-55: Filename parsing + queuedItem metadata pipeline look correct

The combination of NewQueueFile’s compiled regexp, queuedItem {file, data}, listItems, and parseQueueFileName correctly reconstructs ItemData from filenames, filters unexpected files, and sorts by QueuedAt. The regexes line up with queueFileName, and malformed names are logged and skipped rather than breaking queue operations. No issues spotted here.

Also applies to: 57-60, 335-372, 374-410

internal/cmd/dequeue.go (1)

48-71: dequeueFirst correctly adapts to the new QueuedItemData.Data() contract

The added queues‑enabled guard, explicit nil‑item check, and propagation of item.Data() errors are all appropriate, and passing the dereferenced DAGRunRef into dequeueDAGRun with alreadyDequeued=true matches the store semantics. This path now fails fast on configuration or dequeue/data issues without introducing new failure modes.

@yottahmd yottahmd merged commit b2da767 into main Dec 7, 2025
5 checks passed
@yottahmd yottahmd deleted the 1437-fix-queue-issue branch December 7, 2025 05:47
@codecov
Copy link
Copy Markdown

codecov Bot commented Dec 8, 2025

Codecov Report

❌ Patch coverage is 60.74766% with 42 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.82%. Comparing base (07929d9) to head (c1958f3).
⚠️ Report is 6 commits behind head on main.

Files with missing lines Patch % Lines
internal/persistence/filequeue/queuefile.go 54.00% 18 Missing and 5 partials ⚠️
internal/service/scheduler/queue_processor.go 27.27% 7 Missing and 1 partial ⚠️
internal/persistence/filequeue/store.go 16.66% 5 Missing ⚠️
internal/persistence/filequeue/job.go 88.57% 2 Missing and 2 partials ⚠️
internal/cmd/dequeue.go 50.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1457      +/-   ##
==========================================
- Coverage   59.86%   59.82%   -0.04%     
==========================================
  Files         188      188              
  Lines       21167    21299     +132     
==========================================
+ Hits        12671    12743      +72     
- Misses       7180     7229      +49     
- Partials     1316     1327      +11     
Files with missing lines Coverage Δ
internal/core/execution/queue.go 0.00% <ø> (ø)
internal/persistence/filequeue/queue.go 78.21% <100.00%> (-0.22%) ⬇️
internal/cmd/dequeue.go 51.31% <50.00%> (-0.74%) ⬇️
internal/persistence/filequeue/job.go 89.74% <88.57%> (-10.26%) ⬇️
internal/persistence/filequeue/store.go 45.39% <16.66%> (-0.06%) ⬇️
internal/service/scheduler/queue_processor.go 38.84% <27.27%> (-0.84%) ⬇️
internal/persistence/filequeue/queuefile.go 56.48% <54.00%> (-3.73%) ⬇️

... and 9 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 07929d9...c1958f3. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Queued Dags are not getting started

1 participant