-
Notifications
You must be signed in to change notification settings - Fork 715
fix: skip when start time > end time for delayed derived stream #8048
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Subhra264
commented
Aug 19, 2025
- When delay is non-zero for derived stream, currently sometimes the start time becomes greater than the end time. This pr fixes that by moving to the next run where the end time eventually becomes greater than the start time and also follows the given delay.
- Currently when updating a derived stream, it deletes the old scheduled job, and hence the start time for the scheduled job changes. As a result, there can be duplicate results in the derived stream because of updates. This pr fixes this by just updating only the next_run_at field of the scheduled_job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR fixes two critical timing issues in derived stream processing when delays are configured. The first issue occurs when a derived stream has a non-zero delay - the temporal calculations can result in invalid time ranges where start_time > end_time, causing pipeline failures. The second issue involves duplicate results when updating derived stream pipelines because the system would delete and recreate scheduled jobs, resetting their timing state.
The solution involves three key changes:
-
Added helper methods to PipelineSource enum (
src/config/src/meta/pipeline/components.rs): Theis_scheduled()andis_realtime()methods provide clean type checking capabilities using idiomatic Rust pattern matching with thematches!macro. -
Modified pipeline update logic (
src/service/pipeline/mod.rs): Theupdate_pipelinefunction now conditionally deletes derived stream triggers only when the pipeline source type changes from scheduled to realtime, rather than always deleting on any update. This preserves the scheduled job'snext_run_attiming state. -
Enhanced scheduler validation (
src/service/alerts/scheduler/handlers.rs): Added comprehensive validation inhandle_derived_stream_triggersto detect invalid time ranges where start > end time. When detected, the system gracefully skips to the next scheduled run with proper status reporting (TriggerDataStatus::Skipped) and improved logging.
These changes work together to maintain temporal consistency in derived stream processing while preserving scheduled job state during updates, preventing both timing failures and data duplication.
Confidence score: 4/5
- This PR addresses well-defined timing bugs with a logical approach that preserves scheduler state
- Score reflects complex scheduler logic changes that require careful testing in production scenarios
- Pay close attention to the scheduler validation logic in handlers.rs for potential edge cases
3 files reviewed, no comments
- When delay is non-zero for derived stream, currently sometimes the start time becomes greater than the end time. This pr fixes that by moving to the next run where the end time eventually becomes greater than the start time and also follows the given delay. - Currently when updating a derived stream, it deletes the old scheduled job, and hence the start time for the scheduled job changes. As a result, there can be duplicate results in the derived stream because of updates. This pr fixes this by just updating only the next_run_at field of the scheduled_job.
- When delay is non-zero for derived stream, currently sometimes the start time becomes greater than the end time. This pr fixes that by moving to the next run where the end time eventually becomes greater than the start time and also follows the given delay. - Currently when updating a derived stream, it deletes the old scheduled job, and hence the start time for the scheduled job changes. As a result, there can be duplicate results in the derived stream because of updates. This pr fixes this by just updating only the next_run_at field of the scheduled_job.
… (#8065) - When delay is non-zero for derived stream, currently sometimes the start time becomes greater than the end time. This pr fixes that by moving to the next run where the end time eventually becomes greater than the start time and also follows the given delay. - Currently when updating a derived stream, it deletes the old scheduled job, and hence the start time for the scheduled job changes. As a result, there can be duplicate results in the derived stream because of updates. This pr fixes this by just updating only the next_run_at field of the scheduled_job.