fix(filequeue): process queue specified job properly#1457
Conversation
WalkthroughQueued item data moved to lazy, file-backed loading: Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor CLI
participant FileQueue as FileQueue (QueuedFile)
participant Scheduler
participant DagAttemptStore as AttemptStore
participant DAG as DAGLoader
CLI->>FileQueue: Push (write JSON file)
Note right of FileQueue: persisted as file + filename metadata
Scheduler->>FileQueue: List/Pop -> returns QueuedFile (file path)
Scheduler->>FileQueue: item.Data()
FileQueue-->>Scheduler: (*DAGRunRef, error)
alt data loaded
Scheduler->>AttemptStore: FindAttempt(*dagRunRef)
AttemptStore-->>Scheduler: Attempt
Scheduler->>DAG: Attempt.ReadDAG(ctx)
DAG-->>Scheduler: DAG (or error)
alt DAG loaded
Scheduler->>Scheduler: processDAG(runRef := *dagRunRef)
else DAG load error
Scheduler-->>Scheduler: log & skip item
end
else load error
Scheduler-->>Scheduler: log & skip item
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
internal/persistence/filequeue/job.go (1)
14-18: Consider documenting the dual-source design.The
Jobstruct embedsItemData(used for ID extraction in constructor) butData()reads from the file path. This is intentional - the file is the source of truth for data retrieval. Consider adding a brief comment to clarify this relationship for future maintainers.type Job struct { - id string - file string + id string // derived from ItemData.FileName during construction + file string // source of truth for Data() calls ItemData }internal/persistence/filequeue/store.go (1)
191-199: Consider returning partial results with warning or collecting errors.Currently, items with data retrieval errors are silently skipped (only logged). While this prevents one corrupted queue file from breaking the entire list operation, callers have no way to know that items were skipped.
Consider one of these alternatives:
- Return a separate slice of errors alongside results
- Add a warning-level log with the skipped item count
- Document this skip behavior in the method's docstring
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
internal/cmd/dequeue.go(1 hunks)internal/core/execution/queue.go(1 hunks)internal/integration/distributed_e2e_test.go(1 hunks)internal/integration/distributed_test.go(3 hunks)internal/persistence/filequeue/job.go(3 hunks)internal/persistence/filequeue/job_test.go(2 hunks)internal/persistence/filequeue/queue_test.go(4 hunks)internal/persistence/filequeue/queuefile.go(8 hunks)internal/persistence/filequeue/store.go(2 hunks)internal/persistence/filequeue/store_test.go(3 hunks)internal/service/frontend/api/v2/queues.go(2 hunks)internal/service/scheduler/queue_processor.go(2 hunks)internal/service/scheduler/queue_processor_test.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/core/execution/queue.gointernal/integration/distributed_test.gointernal/persistence/filequeue/store.gointernal/persistence/filequeue/job_test.gointernal/persistence/filequeue/queuefile.gointernal/integration/distributed_e2e_test.gointernal/persistence/filequeue/job.gointernal/cmd/dequeue.gointernal/service/frontend/api/v2/queues.gointernal/persistence/filequeue/queue_test.gointernal/persistence/filequeue/store_test.gointernal/service/scheduler/queue_processor.gointernal/service/scheduler/queue_processor_test.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Co-locate Go tests as*_test.go; favour table-driven cases and cover failure paths
Usestretchr/testify/requireand shared fixtures frominternal/testinstead of duplicating mocks
Files:
internal/integration/distributed_test.gointernal/persistence/filequeue/job_test.gointernal/integration/distributed_e2e_test.gointernal/persistence/filequeue/queue_test.gointernal/persistence/filequeue/store_test.gointernal/service/scheduler/queue_processor_test.go
🧬 Code graph analysis (12)
internal/core/execution/queue.go (1)
internal/core/execution/dagrun.go (1)
DAGRunRef(144-147)
internal/integration/distributed_test.go (1)
internal/runtime/data.go (1)
Data(17-20)
internal/persistence/filequeue/store.go (1)
internal/persistence/filequeue/job.go (1)
NewJob(20-29)
internal/persistence/filequeue/job_test.go (3)
internal/persistence/filequeue/queuefile.go (1)
ItemData(62-66)internal/core/execution/dagrun.go (1)
DAGRunRef(144-147)internal/persistence/filequeue/job.go (1)
NewJob(20-29)
internal/persistence/filequeue/queuefile.go (3)
internal/common/logger/tag/tag.go (3)
File(72-74)Error(20-22)Name(271-273)internal/persistence/filequeue/job.go (1)
NewJob(20-29)internal/common/logger/context.go (2)
Error(50-52)Warn(45-47)
internal/integration/distributed_e2e_test.go (1)
internal/runtime/data.go (1)
Data(17-20)
internal/persistence/filequeue/job.go (2)
internal/persistence/filequeue/queuefile.go (1)
ItemData(62-66)internal/core/execution/dagrun.go (1)
DAGRunRef(144-147)
internal/cmd/dequeue.go (1)
internal/runtime/data.go (1)
Data(17-20)
internal/persistence/filequeue/queue_test.go (1)
internal/runtime/data.go (1)
Data(17-20)
internal/persistence/filequeue/store_test.go (1)
internal/runtime/data.go (1)
Data(17-20)
internal/service/scheduler/queue_processor.go (4)
internal/runtime/data.go (1)
Data(17-20)internal/common/logger/tag/tag.go (3)
Error(20-22)RunID(40-42)ID(276-278)internal/common/logger/context.go (1)
Error(50-52)internal/common/logger/logger.go (1)
WithValues(83-89)
internal/service/scheduler/queue_processor_test.go (1)
internal/runtime/data.go (1)
Data(17-20)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (19)
internal/core/execution/queue.go (1)
70-70: Interface change enables proper error handling for queue data retrieval.The signature change from
Data() DAGRunReftoData() (*DAGRunRef, error)is the core fix for issue #1437. This allows the implementation to lazily load queue item data from the file system and properly propagate errors (like "name or path is required") to callers.internal/persistence/filequeue/job.go (1)
37-49: Data() now correctly loads DAGRunRef from file - this is the fix for #1437.The implementation properly reads the queue file, unmarshals the JSON, and returns the
DAGRunRef. This fixes the original error "failed to load DAG from : name or path is required" by ensuring the DAG name and ID are actually loaded from the persisted queue file.One consideration:
Data()reads the file on every call. If this method is called frequently for the same item, consider caching the result. However, for the current usage pattern (called once during dequeue/processing), this is acceptable.internal/service/scheduler/queue_processor_test.go (1)
100-109: Test correctly updated for new Data() API.The test properly handles the new error return from
Data()and usesrequire.NoErrorto fail fast if data retrieval fails. The comparison logic usingdata.IDis correct.internal/cmd/dequeue.go (1)
62-66: Proper error handling and pointer dereferencing for dequeue operation.The code correctly:
- Retrieves data with error handling
- Provides a descriptive error message if data retrieval fails
- Dereferences
*datasafely (after error check) to pass value todequeueDAGRuninternal/integration/distributed_e2e_test.go (1)
74-78: E2E test correctly adapted for new Data() API.The test now properly handles the error return from
Data()and continues to log the DAG name and run ID. Therequire.NoErrorassertion ensures test failure if data cannot be retrieved.internal/integration/distributed_test.go (3)
68-71: Test correctly handles new Data() API for queue verification.
160-165: Properly extracts DAGRunRef value for subsequent dequeue operation.The code correctly dereferences
*datato assign to thedagRunvariable of typeexecution.DAGRunRef, which is then used forDequeueByDAGRunIDon line 168.
188-192: Retry verification correctly uses new Data() API.The test properly asserts that the retry was enqueued with the same DAG run ID.
internal/persistence/filequeue/store.go (1)
109-109: NewJob call correctly updated to include file path.The file path is now passed as the first argument to
NewJob, enabling the lazy-loading behavior inData().internal/persistence/filequeue/job_test.go (2)
13-44: Comprehensive test setup with file-based data.The test correctly:
- Creates a temp directory and file with valid JSON content
- Passes the file path to
NewJob- Validates both ID extraction and Data() retrieval
This properly exercises the new file-backed implementation.
46-61: Good error case coverage for missing file.
TestJob_DataErrorvalidates thatData()returns an error when the backing file doesn't exist. This is important for handling edge cases like deleted queue files. As per coding guidelines, this covers the failure path appropriately.internal/service/scheduler/queue_processor.go (2)
290-298: LGTM! This addresses the root cause of the reported issue.The explicit error handling for
item.Data()resolves the "name or path is required" error mentioned in issue #1437. The pattern correctly retrieves data once, handles errors, and reuses the data reference.Note: Similar to line 276, verify that
Data()never returns(nil, nil)when error is nil to prevent potential panic at line 298.
271-276: The code is safe as written. TheData()method onexecution.QueuedItemDatafollows the standard Go error handling contract: when it returns a non-nil error, the data pointer is nil; when it returns no error, the pointer is guaranteed to be non-nil. The implementation ininternal/persistence/filequeue/job.goconfirms this by returning&itemData.DAGRun(always a valid pointer) on success or(nil, error)on failure. Dereferencing*dataafter theif err != nilcheck is safe and requires no additional nil validation.Likely an incorrect or invalid review comment.
internal/persistence/filequeue/queue_test.go (1)
47-52: LGTM! Test updates correctly align with the new Data() API.The tests are properly updated to handle the new error-returning signature of
Data()and appropriately useID()for verification after dequeue operations. The comments clearly explain whyData()cannot be called after dequeue (file is removed), which is helpful for maintainability.Also applies to: 63-63, 92-95, 101-104, 126-127, 131-131, 146-149
internal/persistence/filequeue/store_test.go (1)
43-48: LGTM! Store test updates are consistent with the new API.The changes mirror the pattern from queue_test.go and correctly handle the error-returning
Data()method. UsingID()for verification after dequeue and adding error checks forData()calls are both appropriate.Also applies to: 79-87, 146-149
internal/service/frontend/api/v2/queues.go (2)
91-97: LGTM! Error handling for queued items is appropriate.Skipping items that fail data retrieval is reasonable for a listing operation. The error handling prevents the reported "name or path is required" issue from affecting the queue list endpoint.
Note: Ensure
Data()never returns(nil, nil)to prevent potential panic at line 97 when dereferencing*dagRunRef.
107-107: No changes needed. The usage ofdag.ProcGroup()at line 107 is correct and consistent with how queues are identified throughout the codebase. It respects the DAG's configured queue (if set viadag.Queue) rather than using the DAG name, which aligns with queue operations elsewhere.Likely an incorrect or invalid review comment.
internal/persistence/filequeue/queuefile.go (2)
172-190: LGTM! Error handling for Data() retrieval is correct.The code properly handles errors when retrieving job data and safely continues processing other items on failure. The pattern of calling
Data()to validate file content before removal is safer than relying solely on filename-parsed metadata.Note: While
item.DAGRun.IDanditem.DAGRun.Nameare available from the Item struct, callingData()validates the actual file content, which provides better correctness guarantees.
316-316: LGTM! The Item wrapper pattern is well-integrated.The change to return
[]ItemfromlistItems(lines 316, 326, 341-344) provides callers with both the file path and parsed metadata, enabling the file-backed data retrieval pattern used throughout the PR. This is a clean internal refactoring that supports the error-handling improvements.Also applies to: 326-344
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (1)
internal/persistence/filequeue/queuefile.go (1)
55-59: Unexport theItemstruct.The
Itemstruct is only used internally bylistItems()and is not part of the public API. Rename toitemto reduce the exported API surface.-// Item represents a queued item in the queue file -type Item struct { +// item represents a queued item in the queue file +type item struct { File string ItemData }Also update
listItemsreturn type and usages:-func (q *QueueFile) listItems(ctx context.Context) ([]Item, error) { +func (q *QueueFile) listItems(ctx context.Context) ([]item, error) { ... - var items []Item + var items []item ... - items = append(items, Item{ + items = append(items, item{
🧹 Nitpick comments (1)
internal/persistence/filequeue/job.go (1)
98-100: Simplify selector by removing embedded field name.The static analysis tool correctly identifies that
j.ItemData.DAGRuncan be simplified toj.DAGRunsinceItemDatais embedded inJob.func (j *Job) Data() (*execution.DAGRunRef, error) { - return &j.ItemData.DAGRun, nil + return &j.DAGRun, nil }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
internal/persistence/filequeue/job.go(1 hunks)internal/persistence/filequeue/job_test.go(1 hunks)internal/persistence/filequeue/queuefile.go(10 hunks)internal/persistence/filequeue/queuefile_test.go(3 hunks)internal/persistence/filequeue/store.go(2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/persistence/filequeue/job_test.gointernal/persistence/filequeue/queuefile.gointernal/persistence/filequeue/queuefile_test.gointernal/persistence/filequeue/store.gointernal/persistence/filequeue/job.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Co-locate Go tests as*_test.go; favour table-driven cases and cover failure paths
Usestretchr/testify/requireand shared fixtures frominternal/testinstead of duplicating mocks
Files:
internal/persistence/filequeue/job_test.gointernal/persistence/filequeue/queuefile_test.go
🧬 Code graph analysis (4)
internal/persistence/filequeue/job_test.go (3)
internal/persistence/filequeue/queuefile.go (1)
ItemData(62-66)internal/core/execution/dagrun.go (1)
DAGRunRef(144-147)internal/persistence/filequeue/job.go (1)
NewJob(23-31)
internal/persistence/filequeue/queuefile.go (3)
internal/core/execution/dagrun.go (1)
DAGRunRef(144-147)internal/core/execution/queue.go (1)
QueuedItemData(66-71)internal/persistence/filequeue/job.go (2)
NewJob(23-31)QueuedFile(15-21)
internal/persistence/filequeue/store.go (5)
internal/persistence/filequeue/job.go (1)
NewJob(23-31)internal/runtime/data.go (1)
Data(17-20)api/v2/api.gen.go (1)
Error(432-441)internal/common/logger/tag/tag.go (2)
Error(20-22)Name(271-273)internal/common/logger/context.go (1)
Error(50-52)
internal/persistence/filequeue/job.go (3)
internal/core/execution/queue.go (1)
QueuedItemData(66-71)internal/persistence/filequeue/queuefile.go (1)
ItemData(62-66)internal/core/execution/dagrun.go (1)
DAGRunRef(144-147)
🪛 GitHub Check: Go Linter
internal/persistence/filequeue/job.go
[failure] 99-99:
QF1008: could remove embedded field "ItemData" from selector (staticcheck)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (16)
internal/persistence/filequeue/job_test.go (4)
13-44: LGTM!The test correctly validates the lazy-loading behavior of
QueuedFile: creates a temp file with proper JSON, verifies ID extraction from filename, and confirmsData()returns the correctDAGRunReffields.
46-55: LGTM!Good coverage of the error path when the underlying file doesn't exist.
57-91: LGTM!This test validates a critical aspect of the fix:
ExtractJob()caches the data so that after the file is deleted (simulating what happens afterPop()), the job data remains accessible. This directly addresses the original issue.
93-130: LGTM!The caching test properly validates that once
Data()is called, subsequent calls use the cached value even if the file is deleted.internal/persistence/filequeue/job.go (4)
13-21: LGTM!The
QueuedFilestruct correctly implements lazy loading with proper synchronization via mutex. The interface satisfaction assertion ensures compile-time checking.
23-31: LGTM!The constructor correctly derives the ID from the filename by stripping the extension, which aligns with how queue files are named.
47-69: LGTM!The
loadData()implementation correctly:
- Uses mutex to protect cache access
- Returns cached data on subsequent calls
- Wraps errors with context for debugging
71-83: LGTM!
ExtractJob()correctly creates aJobwith a copy of the data, ensuring the job retains its data even after the source file is deleted.internal/persistence/filequeue/store.go (2)
101-103: LGTM!Correctly updated to use
NewJob(file)which now handles lazy loading of queue item data.
185-195: LGTM!Good error handling for the new
Data()signature. Logging and skipping items that fail to load is appropriate for a listing operation.internal/persistence/filequeue/queuefile_test.go (3)
49-54: LGTM!Correctly updated to handle the new
Data()signature that returns(*DAGRunRef, error)and properly asserts on the returned fields.
90-96: LGTM!Test correctly validates
Data()retrieval afterFindByDAGRunIDand verifies the ID prefix format derived from the filename.
126-130: LGTM!Test correctly validates
Data()retrieval afterPopByDAGRunIDoperation.internal/persistence/filequeue/queuefile.go (3)
170-199: LGTM!The
PopByDAGRunIDimplementation correctly:
- Creates a
QueuedFileviaNewJob(item.File)- Loads data to check for matching DAGRunRef
- Calls
ExtractJob()to cache data before file removal- Removes file only after successful extraction
- Logs errors and continues processing remaining items
255-280: LGTM!The
Popimplementation correctly extracts job data before removing the file, ensuring the returned item has cached data accessible even after the source file is deleted. This is the core fix for issue #1437.
334-362: LGTM!The
listItemsfunction correctly wraps eachItemDatawith its file path in the newItemstruct, enabling callers to create lazy-loadingQueuedFileinstances.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
internal/persistence/filequeue/job.go (2)
47-69: Lazy loading and caching logic is soundThe
loadDataimplementation (mutex, JSON read, unmarshal, cache) looks correct and concurrency-safe, and the shared cache avoids repeated disk reads. No correctness issues spotted here; any further optimization (e.g.,sync.RWMutexfor read-heavy access) would be optional and driven by profiling.
23-36: Clarify naming and comments aroundNewJob,QueuedFile, andJobA few small consistency nits that may help future readers:
NewJobreturns a*QueuedFile, while there is also aJobtype; this can be slightly confusing at call sites. Consider renaming to something likeNewQueuedFileor adjusting the doc comment to clarify that it constructs the file-backed queued item wrapper.- The comment
// ID implements models.QueuedJob.is out of date now that you’re usingexecution.QueuedItemData.- Both
QueuedFileandJobassertexecution.QueuedItemData. If only one of them is meant to be the primary public implementation, you might drop the other assertion or add a brief comment explaining why both are exposed.These are purely polish; the code is correct as written.
Also applies to: 71-83, 85-92
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal/persistence/filequeue/job.go(1 hunks)internal/persistence/filequeue/queue.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/persistence/filequeue/queue.gointernal/persistence/filequeue/job.go
🧬 Code graph analysis (1)
internal/persistence/filequeue/job.go (4)
internal/core/execution/queue.go (1)
QueuedItemData(66-71)internal/persistence/filequeue/queuefile.go (1)
ItemData(62-66)internal/core/execution/dagrun.go (1)
DAGRunRef(144-147)internal/persistence/filedagrun/dagrun.go (1)
DAGRun(48-52)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (2)
internal/persistence/filequeue/queue.go (1)
88-100: Bulk append change inDequeueByDAGRunIDlooks goodSwitching from per-item appends to
items = append(items, popped...)is functionally equivalent, simpler, and slightly more efficient. No correctness or concurrency concerns here.internal/persistence/filequeue/job.go (1)
39-45: All call sites ofData()already handle errors correctlyThe behavior change to return
(*execution.DAGRunRef, error)has been properly implemented across the codebase. A scan of all 18 usages confirms that every call site checks the error:
- Queue file operations in
store.go,queuefile.go: error logged- API endpoint in
queues.go: error skipped gracefully withcontinue- Scheduler in
queue_processor.go: error logged- CLI in
dequeue.go: error propagated with context- All tests: errors asserted with
require.NoError()No call sites proceed with zero/empty
DAGRunRefon error—all are safe.
|
@coderabbitai generate docstrings |
✅ Actions performedInitiated docstring generation; will generate only if new commits exist. |
|
Note Docstrings generation - SUCCESS |
Docstrings generation was requested by @yottahmd. * #1457 (comment) The following files were modified: * `internal/cmd/dequeue.go` * `internal/persistence/filequeue/job.go` * `internal/persistence/filequeue/queuefile.go`
Docstrings generation was requested by @yottahmd. * #1457 (comment) The following files were modified: * `internal/cmd/dequeue.go` * `internal/persistence/filequeue/job.go` * `internal/persistence/filequeue/queuefile.go` <details> <summary>These files were ignored</summary> * `internal/integration/distributed_e2e_test.go` * `internal/integration/distributed_test.go` * `internal/persistence/filequeue/job_test.go` * `internal/persistence/filequeue/queue_test.go` * `internal/persistence/filequeue/queuefile_test.go` * `internal/persistence/filequeue/store_test.go` * `internal/service/scheduler/queue_processor_test.go` </details> <details> <summary>ℹ️ Note</summary><blockquote> CodeRabbit cannot perform edits on its own pull requests yet. </blockquote></details> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
internal/persistence/filequeue/queuefile.go (2)
46-55: NewQueueFile docs align with implementation, could add an explicit exampleThe constructor and compiled regex match the described pattern for queue files; to make this even clearer for future maintainers, consider adding a concrete sample filename (e.g.,
item_high_20250101_120000_123456789Z_<runID>.json) to the comment.
150-205: PopByDAGRunID logic correctly filters and unwraps matching jobsUsing
NewQueuedFile(item.file)and checkingdata.Name/data.IDbeforeExtractJob()and file removal ensures only matching DAG runs are popped, while logging and continuing on errors avoids aborting the whole batch. Note thatitem.data.DAGRunalready holds the parsed name/ID; if this becomes performance‑sensitive, you could filter usingitem.data.DAGRunfirst and defer JSON loading to only the matching items.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
internal/cmd/dequeue.go(3 hunks)internal/persistence/filequeue/job.go(1 hunks)internal/persistence/filequeue/job_test.go(1 hunks)internal/persistence/filequeue/queuefile.go(13 hunks)internal/persistence/filequeue/store.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- internal/persistence/filequeue/store.go
- internal/cmd/dequeue.go
- internal/persistence/filequeue/job_test.go
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/persistence/filequeue/queuefile.gointernal/persistence/filequeue/job.go
🧬 Code graph analysis (2)
internal/persistence/filequeue/queuefile.go (3)
internal/core/execution/dagrun.go (1)
DAGRunRef(144-147)internal/core/execution/queue.go (1)
QueuedItemData(66-71)internal/persistence/filequeue/job.go (2)
NewQueuedFile(26-34)QueuedFile(15-21)
internal/persistence/filequeue/job.go (3)
internal/core/execution/queue.go (1)
QueuedItemData(66-71)internal/persistence/filequeue/queuefile.go (1)
ItemData(63-67)internal/core/execution/dagrun.go (1)
DAGRunRef(144-147)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (6)
internal/persistence/filequeue/job.go (3)
15-34: QueuedFile wrapper and ID derivation look soundUsing the base filename without extension as the queued item ID and keeping the full path in
filecleanly separates queue identity from DAG run identity and matchesexecution.QueuedItemDataexpectations. The constructor is straightforward and side‑effect free, which is good for tests and callers.
41-72: Lazy load + cache implementation is correct and thread‑safe
Data()delegating toloadData()with async.Mutexand a simplenilcache check ensures each file is read/unmarshalled at most once perQueuedFile, and the returned*DAGRunRefis stable because it points into the cachedItemData. Error wrapping clearly distinguishes read vs unmarshal failures.
74-101: Separation between on‑disk QueuedFile and in‑memory Job is well designed
ExtractJob()creating aJobthat embedsItemData(copying it) avoids tying consumers to filesystem concerns while still satisfyingexecution.QueuedItemData, andJob.Data()returning&j.DAGRunkeeps the interface uniform withQueuedFile.Data().internal/persistence/filequeue/queuefile.go (3)
57-60: queuedItem + listItems refactor improves clarity and preserves orderingWrapping each entry as
{file, data ItemData}keeps the path and parsed metadata together, and sorting bydata.QueuedAtmaintains FIFO semantics based on the encoded timestamp. The sharedparseQueueFileNamehelper is a nice centralization of filename parsing logic.Also applies to: 335-371
233-282: Pop now returning QueuedItemData via Job is consistent and robustUnwrapping the first item with
NewQueuedFile(item.file).ExtractJob(), logging a detailed error on failure, and only removing the queue file after a successful unwrap provides a safer pop semantics, while still satisfying theexecution.QueuedItemDatainterface for callers.
374-410: parseQueueFileName helper correctly reconstructs ItemData from filenamesThe regex groups and reconstruction of
QueuedAtfrom(timestamp, nanos)match the format produced byqueueFileName, and usingfilepath.Base(filepath.Dir(path))forDAGRun.Namekeeps the DAG name derivation consistent with the directory layout. Renaming the local variable todataimproves readability over the previous generic name.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
internal/persistence/filequeue/queuefile.go (1)
150-206: Consider more defensive handling of corrupted queue entries inPop/PopByDAGRunIDRight now:
Popreturns an error and leaves the first file in place ifExtractJob()fails, so a single corrupted JSON file can permanently block popping from that queue until manual cleanup.PopByDAGRunIDlogs andcontinues onData()/ExtractJob()errors, which is more forgiving.For better robustness, consider normalizing behavior—for example, logging and either deleting or quarantining clearly corrupted entries, or skipping to the next valid item—so a bad file cannot indefinitely block dequeues.
Also applies to: 208-231, 233-283, 285-309
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
internal/cmd/dequeue.go(2 hunks)internal/persistence/filequeue/job.go(1 hunks)internal/persistence/filequeue/queuefile.go(12 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/cmd/dequeue.gointernal/persistence/filequeue/job.gointernal/persistence/filequeue/queuefile.go
🧬 Code graph analysis (1)
internal/cmd/dequeue.go (1)
internal/runtime/data.go (1)
Data(17-20)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (3)
internal/persistence/filequeue/job.go (1)
13-85: QueuedFile / Job implementations ofQueuedItemDatalook solidLazy, file‑backed loading with a per‑instance mutex and caching is correct, and
Jobcleanly wrapsItemDatawhile still satisfyingexecution.QueuedItemData. Error wrapping provides useful file context, and theData() (*DAGRunRef, error)shape aligns with the updated queue API without introducing concurrency or correctness issues.Also applies to: 87-101
internal/persistence/filequeue/queuefile.go (1)
46-55: Filename parsing +queuedItemmetadata pipeline look correctThe combination of
NewQueueFile’s compiled regexp,queuedItem {file, data},listItems, andparseQueueFileNamecorrectly reconstructsItemDatafrom filenames, filters unexpected files, and sorts byQueuedAt. The regexes line up withqueueFileName, and malformed names are logged and skipped rather than breaking queue operations. No issues spotted here.Also applies to: 57-60, 335-372, 374-410
internal/cmd/dequeue.go (1)
48-71:dequeueFirstcorrectly adapts to the newQueuedItemData.Data()contractThe added queues‑enabled guard, explicit nil‑item check, and propagation of
item.Data()errors are all appropriate, and passing the dereferencedDAGRunRefintodequeueDAGRunwithalreadyDequeued=truematches the store semantics. This path now fails fast on configuration or dequeue/data issues without introducing new failure modes.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1457 +/- ##
==========================================
- Coverage 59.86% 59.82% -0.04%
==========================================
Files 188 188
Lines 21167 21299 +132
==========================================
+ Hits 12671 12743 +72
- Misses 7180 7229 +49
- Partials 1316 1327 +11
... and 9 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
resolves #1437
Co-authored by @kriyanshii
Summary by CodeRabbit
Bug Fixes
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.