Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

@hengfeiyang hengfeiyang commented Sep 15, 2025

This PR change event RetentionPolicy::Interest to Retention::Limit and with a new TTL:

ZO_NATS_EVENT_MAX_AGE=3600

Because the consumer will got error:

missed idle heartbeat

And the consumer will delete by nats-server. then we will loss event with the policy Interest.

PR Type

Enhancement, Bug fix


Description

  • Introduce unified infra::coordinator module

  • Add NATS event max-age configurability

  • Improve coordinator subscription reconnect behavior

  • Replace legacy imports, adjust logging


Diagram Walkthrough

flowchart LR
  cfg["Config: add nats.event_max_age"] -- "used by" --> events["Coordinator events: create_stream with max_age"]
  queue["Queue trait: max_age + deliver_policy"] -- "implemented by" --> natsq["NATS queue impl"]
  natsq -- "consume(deliver_policy)" --> sub["Coordinator subscribe with reconnect"]
  svc["Services (alerts, pipelines, prompts, templates, keys)"] -- "emit/watch via" --> coord["infra::coordinator"]
Loading

File Walkthrough

Relevant files
Enhancement
28 files
nats.rs
Use new coordinator path during register                                 
+1/-1     
config.rs
Add NATS event max age configuration                                         
+2/-0     
prompt.rs
Switch AI prompt events to new coordinator                             
+2/-2     
ai_prompts.rs
New coordinator AI prompts event helpers                                 
[link]   
alerts.rs
New alerts coordinator with watch and keys                             
[link]   
destinations.rs
New destination/template coordinator events                           
[link]   
events.rs
Stream max-age and resilient subscribe                                     
+18/-7   
mod.rs
Introduce unified coordinator module                                         
[link]   
pipelines.rs
New pipeline coordinator event helpers                                     
[link]   
nats.rs
Migrate to new coordinator events API                                       
+5/-5     
lib.rs
Rename module to coordinator                                                         
+1/-1     
mod.rs
Queue API adds max-age and deliver policy                               
+20/-1   
nats.rs
Implement max-age, deliver policy, errors                               
+46/-19 
nop.rs
Stub max-age and deliver policy methods                                   
+28/-3   
ai_prompts.rs
Update watch prefix import path                                                   
+1/-1     
alert.rs
Use new coordinator for alert events                                         
+8/-13   
destinations.rs
Use new coordinator for destination events                             
+2/-2     
templates.rs
Use new coordinator for template events                                   
+2/-2     
keys.rs
Switch to new coordinator accessor                                             
+1/-1     
pipeline.rs
Use new coordinator for pipeline events                                   
+3/-7     
re_pattern.rs
Switch to new coordinator accessor                                             
+1/-1     
ai_prompt.rs
Use new coordinator for prompt events                                       
+2/-2     
alerts.rs
Use new coordinator for alert events                                         
+4/-8     
cipher_keys.rs
Switch to new coordinator accessor                                             
+1/-1     
destinations.rs
Use new coordinator for destination events                             
+2/-2     
pipelines.rs
Use new coordinator for pipeline events                                   
+2/-2     
re_pattern.rs
Switch to new coordinator accessor                                             
+1/-1     
templates.rs
Use new coordinator for template events                                   
+2/-2     
Formatting
1 files
batch_execution.rs
Improve error formatting with {e}                                               
+1/-1     

@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Incomplete Stub

The NOP queue now requires create_with_max_age, create_with_retention_policy_and_max_age, and consume(deliver_policy) but all still return todo!(). Any runtime path using NOP (e.g., tests or non-NATS envs) will panic. Consider implementing no-op behaviors or gating usage to avoid panics.

async fn create_with_max_age(&self, _topic: &str, _max_age: std::time::Duration) -> Result<()> {
    self.create_with_retention_policy_and_max_age(_topic, RetentionPolicy::Limits, _max_age)
        .await
}

async fn create_with_retention_policy_and_max_age(
    &self,
    _topic: &str,
    _retention_policy: RetentionPolicy,
    _max_age: std::time::Duration,
) -> Result<()> {
    todo!()
}

async fn publish(&self, _topic: &str, _value: Bytes) -> Result<()> {
    todo!()
}

async fn consume(
    &self,
    _topic: &str,
    _deliver_policy: Option<DeliverPolicy>,
) -> Result<Arc<mpsc::Receiver<super::Message>>> {
    todo!()
}
Reconnect Semantics

On reconnect, deliver_policy switches to All, which can replay a large backlog depending on max_age. Validate that this is intended and won’t cause duplicate processing or excessive catch-up after transient failures.

let deliver_policy = if reconnect {
    queue::DeliverPolicy::All
} else {
    queue::DeliverPolicy::New
};
log::info!(
    "[COORDINATOR::EVENTS] subscribing to coordinator topic with deliver policy: {:?}",
    deliver_policy
);
let mut receiver: Arc<mpsc::Receiver<queue::Message>> = match queue
    .consume(COORDINATOR_STREAM, Some(deliver_policy))
Error Handling Change

Consumer loop now breaks on message fetch errors instead of continuing. Ensure higher-level logic properly resubscribes and backoffs to avoid silent stop in consumption.

    break;
}
Err(e) => {
    log::error!(
        "Failed to get nats consumer messages for stream {}: {}",
        stream_name,
        e
    );
    break;
}

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR implements a comprehensive refactoring of the coordinator module structure alongside significant improvements to NATS queue management with configurable retention policies. The changes can be broken down into two main categories:

Module Restructuring: The entire cluster_coordinator module has been renamed and reorganized to coordinator, simplifying the module hierarchy across 20+ files. This includes moving files like src/infra/src/cluster_coordinator/ to src/infra/src/coordinator/ and updating all import paths throughout the codebase. The refactoring maintains identical functionality while providing cleaner, more concise module paths.

NATS Queue Enhancements: The Queue trait has been significantly expanded to support more sophisticated queue management. Key improvements include:

  • Addition of create_with_max_age and create_with_retention_policy_and_max_age methods for fine-grained retention control
  • Introduction of a DeliverPolicy enum (All, Last, New) to control message delivery patterns
  • Enhanced consume method that accepts optional delivery policies
  • New event_max_age configuration field (default 3600 seconds) for controlling individual event lifespans
  • Switch from Interest-based retention to time-based retention for coordinator events
  • Improved reconnection logic with intelligent deliver policy selection

The coordinator events system now uses configurable max age policies instead of fixed Interest retention, providing operators with better control over event lifecycle management. The system intelligently handles reconnections by using DeliverPolicy::All to catch up on missed events versus DeliverPolicy::New for fresh connections.

These changes integrate well with OpenObserve's distributed architecture, where the coordinator manages cluster-wide events for alerts, pipelines, destinations, and AI prompts. The enhanced queue capabilities allow for more resilient message handling in distributed deployments while the module restructuring improves code maintainability.

Confidence score: 4/5

  • This PR requires careful review due to extensive module restructuring and queue interface changes
  • Score reflects the broad scope of changes affecting critical infrastructure components, though individual changes appear well-structured
  • Pay close attention to the NATS queue implementation and coordinator event handling logic

29 files reviewed, 3 comments

Edit Code Review Bot Settings | Greptile

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Avoid panics in NOP queue

The NOP queue now routes calls from multiple paths; leaving todo!() will panic at
runtime in non-NATS modes. Return benign Ok results or explicit errors instead of
panicking to avoid crashes when the NOP backend is active.

src/infra/src/queue/nop.rs [72-91]

-async fn create(&self, _topic: &str) -> Result<()> {
-    self.create_with_retention_policy(_topic, RetentionPolicy::Limits)
-        .await
-}
-...
 async fn create_with_retention_policy_and_max_age(
     &self,
     _topic: &str,
     _retention_policy: RetentionPolicy,
     _max_age: std::time::Duration,
 ) -> Result<()> {
-    todo!()
+    Ok(())
 }
-...
+
 async fn publish(&self, _topic: &str, _value: Bytes) -> Result<()> {
-    todo!()
+    Ok(())
 }
-...
+
 async fn consume(
     &self,
     _topic: &str,
     _deliver_policy: Option<DeliverPolicy>,
 ) -> Result<Arc<mpsc::Receiver<super::Message>>> {
-    todo!()
+    let (_tx, rx) = mpsc::channel(1);
+    Ok(Arc::new(rx))
 }
Suggestion importance[1-10]: 8

__

Why: Leaving todo!() will panic at runtime when the NOP backend is used; returning benign Ok results avoids crashes and matches the trait usage added in this PR. The suggested changes accurately modify the shown new hunk and improve robustness.

Medium
Ensure unique durable consumer names

Avoid using a fixed consumer_name for all topics; it will cause durable consumer
name collisions across different subjects and lead to missed or duplicated
deliveries. Generate a per-topic durable name (e.g., include stream_name and node
id) and use it consistently. Also, pass a unique queue group when applicable if
shared consumption is intended.

src/infra/src/queue/nats.rs [136-210]

 async fn consume(
     &self,
     topic: &str,
     deliver_policy: Option<queue::DeliverPolicy>,
 ) -> Result<Arc<mpsc::Receiver<super::Message>>> {
     let (tx, rx) = mpsc::channel(1024);
     let stream_name = format!("{}{}", self.prefix, format_key(topic));
-    let consumer_name = self.consumer_name.clone();
+    // Make consumer name unique per stream/topic to avoid collisions
+    let consumer_name = format!("{}_{}", stream_name, self.consumer_name);
     let is_durable = self.is_durable;
 
     {
         let client = get_nats_client().await.clone();
         let jetstream = jetstream::new(client);
         let stream = jetstream.get_stream(&stream_name).await.map_err(|e| {
             log::error!("Failed to get nats stream {}: {}", stream_name, e);
             Error::Message(format!("Failed to get nats stream {}: {}", stream_name, e))
         })?;
         let config = jetstream::consumer::pull::Config {
-            name: Some(consumer_name.to_string()),
+            name: Some(consumer_name.clone()),
             durable_name: if is_durable {
-                Some(consumer_name.to_string())
+                Some(consumer_name.clone())
             } else {
                 None
             },
             deliver_policy: get_deliver_policy(deliver_policy),
             ..Default::default()
         };
-        let consumer = stream
-            .get_or_create_consumer(&consumer_name, config)
-            .await
-            .map_err(|e| {
-                log::error!(
-                    "Failed to get_or_create_consumer for nats stream {}: {}",
-                    stream_name,
-                    e
-                );
-                Error::Message(format!(
-                    "Failed to get_or_create_consumer for nats stream {}: {}",
-                    stream_name, e
-                ))
-            })?;
+        let consumer = stream.get_or_create_consumer(&consumer_name, config).await.map_err(|e| {
+            log::error!(
+                "Failed to get_or_create_consumer for nats stream {}: {}",
+                stream_name,
+                e
+            );
+            Error::Message(format!(
+                "Failed to get_or_create_consumer for nats stream {}: {}",
+                stream_name, e
+            ))
+        })?;
         tokio::spawn(async move {
             loop {
                 let Ok(mut messages) = consumer.messages().await else {
-                    log::error!(
-                        "Failed to get nats consumer messages for stream {}",
-                        stream_name
-                    );
+                    log::error!("Failed to get nats consumer messages for stream {}", stream_name);
                     break;
                 };
                 let message = match messages.try_next().await {
                     Ok(Some(message)) => message,
                     Ok(None) => {
-                        log::warn!(
-                            "Nats consumer messages channel closed for stream {}",
-                            stream_name
-                        );
+                        log::warn!("Nats consumer messages channel closed for stream {}", stream_name);
                         break;
                     }
                     Err(e) => {
-                        log::error!(
-                            "Failed to get nats consumer messages for stream {}: {}",
-                            stream_name,
-                            e
-                        );
+                        log::error!("Failed to get nats consumer messages for stream {}: {}", stream_name, e);
                         break;
                     }
                 };
                 let message = super::Message::Nats(message);
-                tx.send(message).await.map_err(|e| {
+                if let Err(e) = tx.send(message).await {
                     log::error!("Failed to send message to channel: {}", e);
-                    Error::Message(format!("Failed to send message to channel: {}", e))
-                })?;
+                    break;
+                }
             }
-            Ok::<(), Error>(())
         });
     }
 
     Ok(Arc::new(rx))
 }
Suggestion importance[1-10]: 7

__

Why: Using a fixed consumer_name across topics can cause durable consumer collisions; making it per-topic is a reasonable, context-aware improvement. The mapping to the new hunk is correct, and the improved code aligns with the suggestion, though impact is moderate rather than critical.

Medium
General
Prevent event replay on reconnect

When reconnecting, using DeliverPolicy::All may replay the entire stream and
duplicate processing. Switch to DeliverPolicy::Last (or New) on reconnect unless you
maintain sequence tracking. This prevents reprocessing old events after transient
errors.

src/infra/src/coordinator/events.rs [141-160]

 async fn subscribe(tx: mpsc::Sender<CoordinatorEvent>) -> Result<()> {
     let queue = queue::get_queue().await;
     let mut reconnect = false;
     loop {
         if config::cluster::is_offline() {
             break;
         }
 
+        // Avoid full replay on reconnect; resume from last/new
         let deliver_policy = if reconnect {
-            queue::DeliverPolicy::All
+            queue::DeliverPolicy::Last
         } else {
             queue::DeliverPolicy::New
         };
         log::info!(
             "[COORDINATOR::EVENTS] subscribing to coordinator topic with deliver policy: {:?}",
             deliver_policy
         );
         let mut receiver: Arc<mpsc::Receiver<queue::Message>> = match queue
             .consume(COORDINATOR_STREAM, Some(deliver_policy))
             .await
         {
             Ok(receiver) => receiver,
             Err(e) => {
                 log::error!("[COORDINATOR::EVENTS] failed to subscribe to coordinator topic: {e}");
                 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                 reconnect = true;
                 continue;
             }
         };
+        // ... rest unchanged ...
Suggestion importance[1-10]: 6

__

Why: The code intentionally replays all on reconnect; switching to Last reduces duplicates but risks missing events without offset tracking. It's a valid trade-off suggestion with moderate impact, not strictly required for correctness.

Low

@hengfeiyang hengfeiyang merged commit 237c757 into main Sep 15, 2025
30 of 40 checks passed
@hengfeiyang hengfeiyang deleted the feat/stream-stats branch September 15, 2025 12:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants