-
Notifications
You must be signed in to change notification settings - Fork 713
fix: skip when start time > end time for delayed derived stream #8064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- When delay is non-zero for derived stream, currently sometimes the start time becomes greater than the end time. This pr fixes that by moving to the next run where the end time eventually becomes greater than the start time and also follows the given delay. - Currently when updating a derived stream, it deletes the old scheduled job, and hence the start time for the scheduled job changes. As a result, there can be duplicate results in the derived stream because of updates. This pr fixes this by just updating only the next_run_at field of the scheduled_job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR addresses critical timing issues with delayed derived streams in OpenObserve's pipeline system. The changes fix two related problems: (1) invalid time ranges where start time exceeds end time due to non-zero delays, and (2) duplicate results caused by unnecessary trigger deletion during pipeline updates.
The first fix introduces time range validation in the scheduler handlers that detects when start time is greater than the aligned end time. When this invalid condition occurs (typically due to delay configurations), the system now gracefully skips to the next scheduled run instead of attempting to process the invalid range. This prevents data corruption and ensures proper scheduling progression.
The second fix modifies the pipeline update logic to only delete the previous scheduled trigger when the pipeline source type actually changes from scheduled to realtime. Previously, any update to a scheduled pipeline would delete and recreate the trigger, resetting the start time and potentially causing duplicate results. Now, scheduled-to-scheduled updates preserve the original scheduling context by only updating the next_run_at field.
Additionally, the PR adds utility methods is_scheduled() and is_realtime() to the PipelineSource enum, improving code readability when checking pipeline source types. The changes also include enhanced logging with consistent trace ID formatting for better debugging capabilities.
These modifications integrate well with the existing derived stream evaluation system, working alongside the DerivedStreamExt trait and the existing save/delete functions in the alerts service. The fixes ensure data integrity while maintaining the flexibility of the pipeline scheduling system.
Confidence score: 4/5
- This PR addresses real timing issues with solid logic but involves complex scheduling code that could have edge cases
- Score reflects the critical nature of the fixes and the thorough approach to handling invalid time ranges and preserving scheduling context
- Pay close attention to the scheduler handlers file where the core timing validation logic was added
3 files reviewed, no comments
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Explore these optional code suggestions:
|
User description
start time becomes greater than the end time. This pr fixes that by
moving to the next run where the end time eventually becomes greater
than the start time and also follows the given delay.
job, and hence the start time for the scheduled job changes. As a
result, there can be duplicate results in the derived stream because of
updates. This pr fixes this by just updating only the next_run_at field
of the scheduled_job.
PR Type
Bug fix, Enhancement
Description
Prevent invalid time ranges with delay
Align end/scheduled times consistently
Respect free-trial by deferring runs a week
Avoid deleting triggers on updates
Diagram Walkthrough
File Walkthrough
components.rs
PipelineSource helpers for source type checkssrc/config/src/meta/pipeline/components.rs
is_scheduledandis_realtimemod.rs
Safer trigger deletion on pipeline source changesrc/service/pipeline/mod.rs
is_realtimeguard during updateshandlers.rs
Robust delayed scheduling and alignment fixessrc/service/alerts/scheduler/handlers.rs