Skip to content

Conversation

@Subhra264
Copy link
Contributor

@Subhra264 Subhra264 commented Aug 20, 2025

User description

  • When delay is non-zero for derived stream, currently sometimes the
    start time becomes greater than the end time. This pr fixes that by
    moving to the next run where the end time eventually becomes greater
    than the start time and also follows the given delay.
  • Currently when updating a derived stream, it deletes the old scheduled
    job, and hence the start time for the scheduled job changes. As a
    result, there can be duplicate results in the derived stream because of
    updates. This pr fixes this by just updating only the next_run_at field
    of the scheduled_job.

PR Type

Bug fix, Enhancement


Description

  • Prevent invalid time ranges with delay

  • Align end/scheduled times consistently

  • Respect free-trial by deferring runs a week

  • Avoid deleting triggers on updates


Diagram Walkthrough

flowchart LR
  A["DerivedStream trigger handling"] -- "align end time" --> B["aligned_end_time computed"]
  B -- "start <= end ?" --> C["Proceed with eval"]
  B -- "start > end" --> D["Skip to next run"]
  D -- "set Skipped status" --> E["update trigger + usage"]
  C -- "evaluate + set times" --> F["update trigger/next_run"]
  G["Free trial check"] -- "expired" --> H["defer next_run_at by 7 days"]
  I["Pipeline update"] -- "Scheduled -> Realtime" --> J["delete old trigger only on source change"]
Loading

File Walkthrough

Relevant files
Enhancement
components.rs
PipelineSource helpers for source type checks                       

src/config/src/meta/pipeline/components.rs

  • Add helper methods is_scheduled and is_realtime
  • Keep existing default implementation
+10/-0   
mod.rs
Safer trigger deletion on pipeline source change                 

src/service/pipeline/mod.rs

  • Delete previous scheduled trigger only when source changes
  • Add use of is_realtime guard during updates
+12/-9   
Bug fix
handlers.rs
Robust delayed scheduling and alignment fixes                       

src/service/alerts/scheduler/handlers.rs

  • Defer runs by 7 days for expired free-trial orgs
  • Compute aligned end and supposed run times
  • Skip runs with invalid start/end due to delay; set Skipped
  • Reorder retry handling; adjust next_run and logging
  • Update trigger_data start/end based on alignment
  • Use aligned time for catch-up logic
+89/-39 

- When delay is non-zero for derived stream, currently sometimes the
start time becomes greater than the end time. This pr fixes that by
moving to the next run where the end time eventually becomes greater
than the start time and also follows the given delay.
- Currently when updating a derived stream, it deletes the old scheduled
job, and hence the start time for the scheduled job changes. As a
result, there can be duplicate results in the derived stream because of
updates. This pr fixes this by just updating only the next_run_at field
of the scheduled_job.
@github-actions github-actions bot added the ☢️ Bug Something isn't working label Aug 20, 2025
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR addresses critical timing issues with delayed derived streams in OpenObserve's pipeline system. The changes fix two related problems: (1) invalid time ranges where start time exceeds end time due to non-zero delays, and (2) duplicate results caused by unnecessary trigger deletion during pipeline updates.

The first fix introduces time range validation in the scheduler handlers that detects when start time is greater than the aligned end time. When this invalid condition occurs (typically due to delay configurations), the system now gracefully skips to the next scheduled run instead of attempting to process the invalid range. This prevents data corruption and ensures proper scheduling progression.

The second fix modifies the pipeline update logic to only delete the previous scheduled trigger when the pipeline source type actually changes from scheduled to realtime. Previously, any update to a scheduled pipeline would delete and recreate the trigger, resetting the start time and potentially causing duplicate results. Now, scheduled-to-scheduled updates preserve the original scheduling context by only updating the next_run_at field.

Additionally, the PR adds utility methods is_scheduled() and is_realtime() to the PipelineSource enum, improving code readability when checking pipeline source types. The changes also include enhanced logging with consistent trace ID formatting for better debugging capabilities.

These modifications integrate well with the existing derived stream evaluation system, working alongside the DerivedStreamExt trait and the existing save/delete functions in the alerts service. The fixes ensure data integrity while maintaining the flexibility of the pipeline scheduling system.

Confidence score: 4/5

  • This PR addresses real timing issues with solid logic but involves complex scheduling code that could have edge cases
  • Score reflects the critical nature of the fixes and the thorough approach to handling invalid time ranges and preserving scheduling context
  • Pay close attention to the scheduler handlers file where the core timing validation logic was added

3 files reviewed, no comments

Edit Code Review Bot Settings | Greptile

@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Possible Issue

In the free-trial branch, new_trigger.next_run_at is incremented by 7 days using chained unwrap() calls on try_days(7) and num_microseconds(). These can panic on overflow or unexpected None; consider safe handling with fallbacks or error propagation.

new_trigger.next_run_at += Duration::try_days(7).unwrap().num_microseconds().unwrap();
db::scheduler::update_trigger(new_trigger).await?;
return Ok(());
Logic Consistency

When skipping due to invalid time range, next_run_at is computed with get_next_trigger_time(..., None), ignoring the current trigger.next_run_at anchor. Verify this aligns with intended cadence and doesn't cause drift compared to other paths that pass Some(trigger.next_run_at).

new_trigger.next_run_at = derived_stream.trigger_condition.get_next_trigger_time(
    false,
    derived_stream.tz_offset,
    false,
    None,
)?;
trigger_data_stream.status = TriggerDataStatus::Skipped;
trigger_data_stream.end_time = aligned_end_time;
trigger_data_stream.next_run_at = new_trigger.next_run_at;
trigger_data_stream.start_time = start_time;
Update Semantics

Now deleting the existing scheduled trigger only when switching to realtime. If scheduled->scheduled updates occur without source change, old triggers are kept; confirm that elsewhere you only update next_run_at and won’t leave duplicate triggers active.

if pipeline.source.is_realtime() {
    // source changed, delete prev. trigger
    if let Err(error) = super::alerts::derived_streams::delete(
        &derived_stream,
        &existing_pipeline.name,
        &existing_pipeline.id,
    )
    .await
    {
        return Err(PipelineError::DeleteDerivedStream(error.to_string()));
    }
}

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent time math panics

Avoid unwrapping on time conversion; a failed conversion will panic the scheduler.
Also guard against overflow when adding to next_run_at. If conversion fails, log and
fall back to a safe default delay.

src/service/alerts/scheduler/handlers.rs [1086-1095]

 if !is_org_in_free_trial_period(&trigger.org).await? {
     log::info!(
         "[Scheduler trace_id {scheduler_trace_id}] Skipping pipeline {} id {} in org {} because free trial expiry",
         pipeline.name,
         pipeline_id,
         trigger.org
     );
-    new_trigger.next_run_at += Duration::try_days(7).unwrap().num_microseconds().unwrap();
+    if let Some(days) = Duration::try_days(7) {
+        if let Some(delta) = days.num_microseconds() {
+            new_trigger.next_run_at = new_trigger.next_run_at.saturating_add(delta);
+        } else {
+            log::warn!("[SCHEDULER trace_id {scheduler_trace_id}] Failed to convert 7 days to microseconds; leaving next_run_at unchanged");
+        }
+    } else {
+        log::warn!("[SCHEDULER trace_id {scheduler_trace_id}] Failed to create 7-day duration; leaving next_run_at unchanged");
+    }
     db::scheduler::update_trigger(new_trigger).await?;
     return Ok(());
 }
Suggestion importance[1-10]: 8

__

Why: Replacing multiple unwraps with safe checks avoids potential panics during duration conversion and guards against overflow, which is critical in a scheduler path. The improved code matches the new hunk and preserves behavior while increasing robustness.

Medium
Guard period multiplication overflow

Check for potential underflow/overflow in period * 60 before passing it along. Large
period values could wrap and misalign scheduling, causing incorrect ranges.

src/service/alerts/scheduler/handlers.rs [1161-1171]

 let aligned_supposed_to_be_run_at = if !is_cron_frequency {
+    let period_secs = derived_stream
+        .trigger_condition
+        .period
+        .checked_mul(60)
+        .unwrap_or_else(|| {
+            log::warn!("[SCHEDULER trace_id {scheduler_trace_id}] period overflow when converting minutes to seconds; capping to i32::MAX");
+            i32::MAX
+        });
     TriggerCondition::align_time(
         supposed_to_be_run_at,
         derived_stream.tz_offset,
-        Some(derived_stream.trigger_condition.period * 60),
+        Some(period_secs),
     )
 } else {
     // For cron frequency, we don't need to align the end time as it is already aligned (the
     // cron crate takes care of it)
     TriggerCondition::align_time(supposed_to_be_run_at, derived_stream.tz_offset, None)
 };
Suggestion importance[1-10]: 6

__

Why: Adding checked_mul for converting minutes to seconds is a reasonable safety improvement against overflow and aligns with the code context. Impact is moderate since typical periods likely small, but the change is accurate and low risk.

Low
General
Use aligned end in logs

Ensure end used in logs and error is the aligned value for consistency and to avoid
confusion; currently end may be the unaligned previous value. Use aligned_end_time
in the message to reflect the actual decision basis.

src/service/alerts/scheduler/handlers.rs [1307-1339]

 if start.is_none_or(|t0| t0 < aligned_end_time) {
     end = aligned_end_time;
 } else {
-    // either t0 = aligned_end_time or t0 > aligned_end_time
-    // in both cases, we need to skip to next run because, we should always use
-    // aligned_curr_time as the end time
-    // Invalid timerange, most probably due to non-zero delay
-    // Don't do any further processing, just skip to next run
     let start_time = start.unwrap();
+    let intended_end = aligned_end_time;
     log::warn!(
         "[SCHEDULER trace_id {scheduler_trace_id}] module key: {}, Invalid timerange. Skipping to next run. start: {}, end: {}",
         new_trigger.module_key,
         start_time,
-        end,
+        intended_end,
     );
     new_trigger.next_run_at = derived_stream.trigger_condition.get_next_trigger_time(
         false,
         derived_stream.tz_offset,
         false,
         None,
     )?;
     trigger_data_stream.status = TriggerDataStatus::Skipped;
-    trigger_data_stream.end_time = aligned_end_time;
+    trigger_data_stream.end_time = intended_end;
     trigger_data_stream.next_run_at = new_trigger.next_run_at;
     trigger_data_stream.start_time = start_time;
     trigger_data_stream.error = Some(format!(
         "Invalid timerange - start: {}, end: {}, should be fixed in the next run",
-        start_time, end,
+        start_time, intended_end,
     ));
     db::scheduler::update_trigger(new_trigger).await?;
     publish_triggers_usage(trigger_data_stream).await;
     return Ok(());
 }
Suggestion importance[1-10]: 5

__

Why: Using the aligned end time in logs and error messages improves clarity and consistency with decision logic. It’s a minor correctness/readability tweak; the code changes align with the PR’s new logic.

Low

@Subhra264 Subhra264 merged commit 6c848af into main Sep 12, 2025
28 checks passed
@Subhra264 Subhra264 deleted the delay_derived_stream branch September 12, 2025 08:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

☢️ Bug Something isn't working Review effort 3/5

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants